http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
new file mode 100644
index 0000000..1ac6b36
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup.maxmind;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.StopWatch;
+
+import com.maxmind.geoip2.model.AnonymousIpResponse;
+import com.maxmind.geoip2.model.CityResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse;
+import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
+import com.maxmind.geoip2.model.DomainResponse;
+import com.maxmind.geoip2.model.IspResponse;
+import com.maxmind.geoip2.record.Country;
+import com.maxmind.geoip2.record.Location;
+import com.maxmind.geoip2.record.Subdivision;
+
+@Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", 
"cellular", "anonymous", "tor"})
+@CapabilityDescription("A lookup service that provides several types of 
enrichment information for IP addresses. The service is configured by providing 
a MaxMind "
+    + "Database file and specifying which types of enrichment should be 
provided for an IP Address. Each type of enrichment is a separate lookup, so 
configuring the "
+    + "service to provide all of the available enrichment data may be slower 
than returning only a portion of the available enrichments. View the Usage of 
this component "
+    + "and choose to view Additional Details for more information, such as the 
Schema that pertains to the information that is returned.")
+public class IPLookupService extends AbstractControllerService implements 
RecordLookupService {
+    private volatile DatabaseReader databaseReader = null;
+
+    static final PropertyDescriptor GEO_DATABASE_FILE = new 
PropertyDescriptor.Builder()
+        .name("database-file")
+        .displayName("MaxMind Database File")
+        .description("Path to Maxmind IP Enrichment Database File")
+        .required(true)
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .build();
+    static final PropertyDescriptor LOOKUP_CITY = new 
PropertyDescriptor.Builder()
+        .name("lookup-city")
+        .displayName("Lookup Geo Enrichment")
+        .description("Specifies whether or not information about the 
geographic information, such as cities, corresponding to the IP address should 
be returned")
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+    static final PropertyDescriptor LOOKUP_ISP = new 
PropertyDescriptor.Builder()
+        .name("lookup-isp")
+        .displayName("Lookup ISP")
+        .description("Specifies whether or not information about the 
Information Service Provider corresponding to the IP address should be 
returned")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+    static final PropertyDescriptor LOOKUP_DOMAIN = new 
PropertyDescriptor.Builder()
+        .name("lookup-domain")
+        .displayName("Lookup Domain Name")
+        .description("Specifies whether or not information about the Domain 
Name corresponding to the IP address should be returned. "
+            + "If true, the lookup will contain second-level domain 
information, such as foo.com but will not contain bar.foo.com")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+    static final PropertyDescriptor LOOKUP_CONNECTION_TYPE = new 
PropertyDescriptor.Builder()
+        .name("lookup-connection-type")
+        .displayName("Lookup Connection Type")
+        .description("Specifies whether or not information about the 
Connection Type corresponding to the IP address should be returned. "
+            + "If true, the lookup will contain a 'connectionType' field that 
(if populated) will contain a value of 'Dialup', 'Cable/DSL', 'Corporate', or 
'Cellular'")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+    static final PropertyDescriptor LOOKUP_ANONYMOUS_IP_INFO = new 
PropertyDescriptor.Builder()
+        .name("lookup-anonymous-ip")
+        .displayName("Lookup Anonymous IP Information")
+        .description("Specifies whether or not information about whether or 
not the IP address belongs to an anonymous network should be returned.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(GEO_DATABASE_FILE);
+        properties.add(LOOKUP_CITY);
+        properties.add(LOOKUP_ISP);
+        properties.add(LOOKUP_DOMAIN);
+        properties.add(LOOKUP_CONNECTION_TYPE);
+        properties.add(LOOKUP_ANONYMOUS_IP_INFO);
+        return properties;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
IOException {
+        final String dbFileString = 
context.getProperty(GEO_DATABASE_FILE).getValue();
+        final File dbFile = new File(dbFileString);
+        final StopWatch stopWatch = new StopWatch(true);
+        final DatabaseReader reader = new 
DatabaseReader.Builder(dbFile).build();
+        stopWatch.stop();
+        getLogger().info("Completed loading of Maxmind Database.  Elapsed time 
was {} milliseconds.", new Object[] 
{stopWatch.getDuration(TimeUnit.MILLISECONDS)});
+        databaseReader = reader;
+    }
+
+    @OnStopped
+    public void closeReader() throws IOException {
+        final DatabaseReader reader = databaseReader;
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    @Override
+    public Optional<Record> lookup(final String key) throws 
LookupFailureException {
+        if (key == null) {
+            return Optional.empty();
+        }
+
+        final InetAddress inetAddress;
+        try {
+            inetAddress = InetAddress.getByName(key);
+        } catch (final IOException ioe) {
+            getLogger().warn("Could not resolve the IP for value '{}'. This is 
usually caused by issue resolving the appropriate DNS record or " +
+                "providing the service with an invalid IP address", new 
Object[] {key}, ioe);
+
+            return Optional.empty();
+        }
+
+        final Record geoRecord;
+        if (getProperty(LOOKUP_CITY).asBoolean()) {
+            final CityResponse cityResponse;
+            try {
+                cityResponse = databaseReader.city(inetAddress);
+            } catch (final Exception e) {
+                throw new LookupFailureException("Failed to lookup City 
information for IP Address " + inetAddress, e);
+            }
+
+            geoRecord = createRecord(cityResponse);
+        } else {
+            geoRecord = null;
+        }
+
+        final Record ispRecord;
+        if (getProperty(LOOKUP_ISP).asBoolean()) {
+            final IspResponse ispResponse;
+            try {
+                ispResponse = databaseReader.isp(inetAddress);
+            } catch (final Exception e) {
+                throw new LookupFailureException("Failed to lookup ISP 
information for IP Address " + inetAddress, e);
+            }
+
+            ispRecord = createRecord(ispResponse);
+        } else {
+            ispRecord = null;
+        }
+
+        final String domainName;
+        if (getProperty(LOOKUP_DOMAIN).asBoolean()) {
+            final DomainResponse domainResponse;
+            try {
+                domainResponse = databaseReader.domain(inetAddress);
+            } catch (final Exception e) {
+                throw new LookupFailureException("Failed to lookup Domain 
information for IP Address " + inetAddress, e);
+            }
+
+            domainName = domainResponse == null ? null : 
domainResponse.getDomain();
+        } else {
+            domainName = null;
+        }
+
+        final String connectionType;
+        if (getProperty(LOOKUP_CONNECTION_TYPE).asBoolean()) {
+            final ConnectionTypeResponse connectionTypeResponse;
+            try {
+                connectionTypeResponse = 
databaseReader.connectionType(inetAddress);
+            } catch (final Exception e) {
+                throw new LookupFailureException("Failed to lookup Domain 
information for IP Address " + inetAddress, e);
+            }
+
+            if (connectionTypeResponse == null) {
+                connectionType = null;
+            } else {
+                final ConnectionType type = 
connectionTypeResponse.getConnectionType();
+                connectionType = type == null ? null : type.name();
+            }
+        } else {
+            connectionType = null;
+        }
+
+        final Record anonymousIpRecord;
+        if (getProperty(LOOKUP_ANONYMOUS_IP_INFO).asBoolean()) {
+            final AnonymousIpResponse anonymousIpResponse;
+            try {
+                anonymousIpResponse = databaseReader.anonymousIp(inetAddress);
+            } catch (final Exception e) {
+                throw new LookupFailureException("Failed to lookup Anonymous 
IP Information for IP Address " + inetAddress, e);
+            }
+
+            anonymousIpRecord = createRecord(anonymousIpResponse);
+        } else {
+            anonymousIpRecord = null;
+        }
+
+        return Optional.ofNullable(createContainerRecord(geoRecord, ispRecord, 
domainName, connectionType, anonymousIpRecord));
+    }
+
+    private Record createRecord(final CityResponse city) {
+        if (city == null) {
+            return null;
+        }
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put(CitySchema.CITY.getFieldName(), city.getCity().getName());
+
+        final Location location = city.getLocation();
+        values.put(CitySchema.ACCURACY.getFieldName(), 
location.getAccuracyRadius());
+        values.put(CitySchema.METRO_CODE.getFieldName(), 
location.getMetroCode());
+        values.put(CitySchema.TIMEZONE.getFieldName(), location.getTimeZone());
+        values.put(CitySchema.LATITUDE.getFieldName(), location.getLatitude());
+        values.put(CitySchema.LONGITUDE.getFieldName(), 
location.getLongitude());
+        values.put(CitySchema.CONTINENT.getFieldName(), 
city.getContinent().getName());
+        values.put(CitySchema.POSTALCODE.getFieldName(), 
city.getPostal().getCode());
+        values.put(CitySchema.COUNTRY.getFieldName(), 
createRecord(city.getCountry()));
+
+        final Object[] subdivisions = new 
Object[city.getSubdivisions().size()];
+        int i = 0;
+        for (final Subdivision subdivision : city.getSubdivisions()) {
+            subdivisions[i++] = createRecord(subdivision);
+        }
+        values.put(CitySchema.SUBDIVISIONS.getFieldName(), subdivisions);
+
+        return new MapRecord(CitySchema.GEO_SCHEMA, values);
+    }
+
+    private Record createRecord(final Subdivision subdivision) {
+        if (subdivision == null) {
+            return null;
+        }
+
+        final Map<String, Object> values = new HashMap<>(2);
+        values.put(CitySchema.SUBDIVISION_NAME.getFieldName(), 
subdivision.getName());
+        values.put(CitySchema.SUBDIVISION_ISO.getFieldName(), 
subdivision.getIsoCode());
+        return new MapRecord(CitySchema.SUBDIVISION_SCHEMA, values);
+    }
+
+    private Record createRecord(final Country country) {
+        if (country == null) {
+            return null;
+        }
+
+        final Map<String, Object> values = new HashMap<>(2);
+        values.put(CitySchema.COUNTRY_NAME.getFieldName(), country.getName());
+        values.put(CitySchema.COUNTRY_ISO.getFieldName(), 
country.getIsoCode());
+        return new MapRecord(CitySchema.COUNTRY_SCHEMA, values);
+    }
+
+    private Record createRecord(final IspResponse isp) {
+        if (isp == null) {
+            return null;
+        }
+
+        final Map<String, Object> values = new HashMap<>(4);
+        values.put(IspSchema.ASN.getFieldName(), 
isp.getAutonomousSystemNumber());
+        values.put(IspSchema.ASN_ORG.getFieldName(), 
isp.getAutonomousSystemOrganization());
+        values.put(IspSchema.NAME.getFieldName(), isp.getIsp());
+        values.put(IspSchema.ORG.getFieldName(), isp.getOrganization());
+
+        return new MapRecord(IspSchema.ISP_SCHEMA, values);
+    }
+
+    private Record createRecord(final AnonymousIpResponse anonymousIp) {
+        if (anonymousIp == null) {
+            return null;
+        }
+
+        final Map<String, Object> values = new HashMap<>(5);
+        values.put(AnonymousIpSchema.ANONYMOUS.getFieldName(), 
anonymousIp.isAnonymous());
+        values.put(AnonymousIpSchema.ANONYMOUS_VPN.getFieldName(), 
anonymousIp.isAnonymousVpn());
+        values.put(AnonymousIpSchema.HOSTING_PROVIDER.getFieldName(), 
anonymousIp.isHostingProvider());
+        values.put(AnonymousIpSchema.PUBLIC_PROXY.getFieldName(), 
anonymousIp.isPublicProxy());
+        values.put(AnonymousIpSchema.TOR_EXIT_NODE.getFieldName(), 
anonymousIp.isTorExitNode());
+
+        return new MapRecord(AnonymousIpSchema.ANONYMOUS_IP_SCHEMA, values);
+    }
+
+    private Record createContainerRecord(final Record geoRecord, final Record 
ispRecord, final String domainName, final String connectionType, final Record 
anonymousIpRecord) {
+        final Map<String, Object> values = new HashMap<>(4);
+        values.put("geo", geoRecord);
+        values.put("isp", ispRecord);
+        values.put("domainName", domainName);
+        values.put("connectionType", connectionType);
+        values.put("anonymousIp", anonymousIpRecord);
+
+        final Record containerRecord = new 
MapRecord(ContainerSchema.CONTAINER_SCHEMA, values);
+        return containerRecord;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java
new file mode 100644
index 0000000..a487716
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IspSchema.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.lookup.maxmind;
+
+import java.util.Arrays;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class IspSchema {
+    static final RecordField NAME = new RecordField("name", 
RecordFieldType.STRING.getDataType());
+    static final RecordField ORG = new RecordField("organization", 
RecordFieldType.STRING.getDataType());
+    static final RecordField ASN = new RecordField("asn", 
RecordFieldType.INT.getDataType());
+    static final RecordField ASN_ORG = new RecordField("asnOrganization", 
RecordFieldType.STRING.getDataType());
+
+    static final RecordSchema ISP_SCHEMA = new 
SimpleRecordSchema(Arrays.asList(NAME, ORG, ASN, ASN_ORG));
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..cdff77c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.lookup.maxmind.IPLookupService
+org.apache.nifi.lookup.SimpleKeyValueLookupService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
new file mode 100644
index 0000000..8f64510
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/docs/org.apache.nifi.lookup.maxmind.IPLookupService/additionalDetails.html
@@ -0,0 +1,102 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>IPLookupService</title>
+
+        <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+      <p>
+        The IPLookupService is powered by a MaxMind database and can return 
several different types of enrichment information
+        about a given IP address. Below is the schema of the Record that is 
returned by this service (in Avro Schema format).
+      </p>
+  
+<code>
+<pre>
+{
+  "name": "ipEnrichment",
+  "namespace": "nifi",
+  "type": "record",
+  "fields": [
+    {
+      "name": "geo",
+      "type": {
+        "name": "cityGeo",
+        "type": "record",
+        "fields": [
+          { "name": "city", "type": "string" },
+          { "name": "accuracy", "type": "int", "doc": "The radius, in 
kilometers, around the given location, where the IP address is believed to be" 
},
+          { "name": "metroCode", "type": "int" },
+          { "name": "timeZone", "type": "string" },
+          { "name": "latitude", "type": "double" },
+          { "name": "longitude", "type": "double" },
+          { "name": "country", "type": {
+            "type": "record",
+            "name": "country",
+            "fields": [
+              { "name": "name", "type": "string" },
+              { "name": "isoCode", "type": "string" }
+            ]
+          } },
+            { "name": "subdivisions", "type": {
+              "type": "array",
+              "items": {
+                "type": "record",
+                "name": "subdivision",
+                "fields": [
+                  { "name": "name", "type": "string" },
+                  { "name": "isoCode", "type": "string" }
+                ]
+              }
+            }
+          },
+          { "name": "continent", "type": "string" },
+          { "name": "postalCode", "type": "string" }
+        ]
+      }
+    },
+    {
+      "name": "isp",
+      "type": {
+          "name": "ispEnrich",
+        "type": "record",
+        "fields": [
+          { "name": "name", "type": "string" },
+          { "name": "organization", "type": "string" },
+          { "name": "asn", "type": "int" },
+          { "name": "asnOrganization", "type": "string" }
+        ]
+    }
+    },
+    {
+      "name": "domainName",
+      "type": "string"
+    },
+    {
+      "name": "connectionType",
+      "type": "string",
+      "doc": "One of 'Dialup', 'Cable/DSL', 'Corporate', 'Cellular'"
+    }
+  ]
+}
+</pre>
+</code>
+
+  </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml
new file mode 100644
index 0000000..a317cb2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/pom.xml
@@ -0,0 +1,28 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-standard-services</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-lookup-services-bundle</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>nifi-lookup-services</module>
+        <module>nifi-lookup-services-nar</module>
+    </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
index 9ea6e64..c819a6c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
@@ -19,6 +19,7 @@ package org.apache.nifi.serialization;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.flowfile.FlowFile;
@@ -66,9 +67,11 @@ public interface RecordSetWriterFactory extends 
ControllerService {
      * @param logger the logger to use when logging information. This is 
passed in, rather than using the logger of the Controller Service
      *            because it allows messages to be logged for the component 
that is calling this Controller Service.
      * @param schema the schema that will be used for writing records
+     * @param flowFile the FlowFile to write to
+     * @param out the OutputStream to write to
      *
      * @return a RecordSetWriter that can write record sets to an OutputStream
      * @throws IOException if unable to read from the given InputStream
      */
-    RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) 
throws SchemaNotFoundException, IOException;
+    RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, 
FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 1360add..70da1e8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.avro;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.LinkedHashMap;
@@ -31,6 +32,7 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.schema.access.SchemaField;
@@ -57,7 +59,7 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
         "The FlowFile will have the Avro schema embedded into the content, as 
is typical with Avro");
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema recordSchema) throws IOException {
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema recordSchema, final FlowFile flowFile, final OutputStream out) 
throws IOException {
         final String strategyValue = 
getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
 
         try {
@@ -78,9 +80,9 @@ public class AvroRecordSetWriter extends 
SchemaRegistryRecordSetWriter implement
             }
 
             if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
-                return new WriteAvroResultWithSchema(avroSchema);
+                return new WriteAvroResultWithSchema(avroSchema, out);
             } else {
-                return new WriteAvroResultWithExternalSchema(avroSchema, 
recordSchema, getSchemaAccessWriter(recordSchema));
+                return new WriteAvroResultWithExternalSchema(avroSchema, 
recordSchema, getSchemaAccessWriter(recordSchema), out);
             }
         } catch (final SchemaNotFoundException e) {
             throw new ProcessException("Could not determine the Avro Schema to 
use for writing the content", e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
index c09e3d5..799d3ee 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
@@ -32,9 +32,11 @@ import java.util.Collections;
 
 public abstract class WriteAvroResult implements RecordSetWriter {
     private final Schema schema;
+    private final OutputStream out;
 
-    public WriteAvroResult(final Schema schema) {
+    public WriteAvroResult(final Schema schema, final OutputStream out) {
         this.schema = schema;
+        this.out = out;
     }
 
     protected Schema getSchema() {
@@ -42,7 +44,7 @@ public abstract class WriteAvroResult implements 
RecordSetWriter {
     }
 
     @Override
-    public WriteResult write(final Record record, final OutputStream out) 
throws IOException {
+    public WriteResult write(final Record record) throws IOException {
         final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
schema);
 
         final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
@@ -58,12 +60,4 @@ public abstract class WriteAvroResult implements 
RecordSetWriter {
     public String getMimeType() {
         return "application/avro-binary";
     }
-
-    public static String normalizeNameForAvro(String inputName) {
-        String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
-        if (Character.isDigit(normalizedName.charAt(0))) {
-            normalizedName = "_" + normalizedName;
-        }
-        return normalizedName;
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index 1cfb02d..a3f9cb8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -20,7 +20,7 @@ package org.apache.nifi.avro;
 import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Collections;
+import java.util.Map;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -29,66 +29,53 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
 
-public class WriteAvroResultWithExternalSchema extends WriteAvroResult {
+public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter 
{
     private final SchemaAccessWriter schemaAccessWriter;
     private final RecordSchema recordSchema;
-
-    public WriteAvroResultWithExternalSchema(final Schema avroSchema, final 
RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter) {
-        super(avroSchema);
+    private final Schema avroSchema;
+    private final BinaryEncoder encoder;
+    private final OutputStream buffered;
+    private final DatumWriter<GenericRecord> datumWriter;
+
+    public WriteAvroResultWithExternalSchema(final Schema avroSchema, final 
RecordSchema recordSchema,
+        final SchemaAccessWriter schemaAccessWriter, final OutputStream out) 
throws IOException {
+        super(out);
         this.recordSchema = recordSchema;
         this.schemaAccessWriter = schemaAccessWriter;
+        this.avroSchema = avroSchema;
+        this.buffered = new BufferedOutputStream(out);
+
+        datumWriter = new GenericDatumWriter<>(avroSchema);
+        schemaAccessWriter.writeHeader(recordSchema, buffered);
+        encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
     }
 
     @Override
-    public WriteResult write(final RecordSet rs, final OutputStream outStream) 
throws IOException {
-        Record record = rs.next();
-        if (record == null) {
-            return WriteResult.of(0, Collections.emptyMap());
-        }
-
-        int nrOfRows = 0;
-        final Schema schema = getSchema();
-        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
-
-        final BufferedOutputStream bufferedOut = new 
BufferedOutputStream(outStream);
-        schemaAccessWriter.writeHeader(recordSchema, bufferedOut);
-
-        final BinaryEncoder encoder = 
EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null);
-
-        do {
-            final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
schema);
-
-            datumWriter.write(rec, encoder);
-            encoder.flush();
-            nrOfRows++;
-        } while ((record = rs.next()) != null);
-
-        bufferedOut.flush();
-
-        return WriteResult.of(nrOfRows, 
schemaAccessWriter.getAttributes(recordSchema));
+    protected void onBeginRecordSet() throws IOException {
+        schemaAccessWriter.writeHeader(recordSchema, buffered);
     }
 
     @Override
-    public WriteResult write(final Record record, final OutputStream out) 
throws IOException {
-        final Schema schema = getSchema();
-        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
-
-        final BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
-        schemaAccessWriter.writeHeader(recordSchema, bufferedOut);
-
-        final BinaryEncoder encoder = 
EncoderFactory.get().blockingBinaryEncoder(bufferedOut, null);
-        final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
schema);
-
-        datumWriter.write(rec, encoder);
+    protected Map<String, String> onFinishRecordSet() throws IOException {
         encoder.flush();
+        buffered.flush();
+        return schemaAccessWriter.getAttributes(recordSchema);
+    }
 
-        bufferedOut.flush();
-
+    @Override
+    public WriteResult write(final Record record) throws IOException {
+        final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
avroSchema);
+        datumWriter.write(rec, encoder);
         return WriteResult.of(1, 
schemaAccessWriter.getAttributes(recordSchema));
     }
+
+    @Override
+    public String getMimeType() {
+        return "application/avro-binary";
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
index 73d70d9..9bfb4cf 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java
@@ -25,57 +25,38 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordSet;
 
-public class WriteAvroResultWithSchema extends WriteAvroResult {
+public class WriteAvroResultWithSchema extends AbstractRecordSetWriter {
 
-    public WriteAvroResultWithSchema(final Schema schema) {
-        super(schema);
-    }
-
-    @Override
-    public WriteResult write(final RecordSet rs, final OutputStream outStream) 
throws IOException {
-        Record record = rs.next();
-        if (record == null) {
-            return WriteResult.of(0, Collections.emptyMap());
-        }
-
-        int nrOfRows = 0;
-        final Schema schema = getSchema();
-        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+    private final DataFileWriter<GenericRecord> dataFileWriter;
+    private final Schema schema;
 
-        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
-            dataFileWriter.create(schema, outStream);
+    public WriteAvroResultWithSchema(final Schema schema, final OutputStream 
out) throws IOException {
+        super(out);
+        this.schema = schema;
 
-            do {
-                final GenericRecord rec = 
AvroTypeUtil.createAvroRecord(record, schema);
-                dataFileWriter.append(rec);
-                nrOfRows++;
-            } while ((record = rs.next()) != null);
-        }
-
-        return WriteResult.of(nrOfRows, Collections.emptyMap());
+        final GenericDatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        dataFileWriter = new DataFileWriter<>(datumWriter);
+        dataFileWriter.create(schema, out);
     }
 
     @Override
-    public WriteResult write(final Record record, final OutputStream out) 
throws IOException {
-        if (record == null) {
-            return WriteResult.of(0, Collections.emptyMap());
-        }
-
-        final Schema schema = getSchema();
-        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
-
-        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
-            dataFileWriter.create(schema, out);
-
-            final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
schema);
-            dataFileWriter.append(rec);
-        }
+    public void close() throws IOException {
+        dataFileWriter.close();
+    }
 
+    @Override
+    public WriteResult write(final Record record) throws IOException {
+        final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, 
schema);
+        dataFileWriter.append(rec);
         return WriteResult.of(1, Collections.emptyMap());
     }
+
+    @Override
+    public String getMimeType() {
+        return "application/avro-binary";
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index d4f066f..c5e6b19 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.csv;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -27,6 +28,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@@ -67,8 +69,8 @@ public class CSVRecordSetWriter extends 
DateTimeTextRecordSetWriter implements R
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema) throws SchemaNotFoundException, IOException {
-        return new WriteCSVResult(csvFormat, schema, 
getSchemaAccessWriter(schema),
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws 
SchemaNotFoundException, IOException {
+        return new WriteCSVResult(csvFormat, schema, 
getSchemaAccessWriter(schema), out,
             getDateFormat().orElse(null), getTimeFormat().orElse(null), 
getTimestampFormat().orElse(null), includeHeader);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index f11249b..8475a50 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -20,38 +20,44 @@ package org.apache.nifi.csv;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.util.Collections;
+import java.util.Map;
 
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
 
-public class WriteCSVResult implements RecordSetWriter {
-    private final CSVFormat csvFormat;
+public class WriteCSVResult extends AbstractRecordSetWriter implements 
RecordSetWriter {
     private final RecordSchema recordSchema;
     private final SchemaAccessWriter schemaWriter;
     private final String dateFormat;
     private final String timeFormat;
     private final String timestampFormat;
-    private final boolean includeHeaderLine;
+    private final CSVPrinter printer;
+    private final Object[] fieldValues;
 
-    public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaWriter,
-        final String dateFormat, final String timeFormat, final String 
timestampFormat, final boolean includeHeaderLine) {
-        this.csvFormat = csvFormat;
+    public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
+        final String dateFormat, final String timeFormat, final String 
timestampFormat, final boolean includeHeaderLine) throws IOException {
+
+        super(out);
         this.recordSchema = recordSchema;
         this.schemaWriter = schemaWriter;
         this.dateFormat = dateFormat;
         this.timeFormat = timeFormat;
         this.timestampFormat = timestampFormat;
-        this.includeHeaderLine = includeHeaderLine;
+
+        final String[] columnNames = recordSchema.getFieldNames().toArray(new 
String[0]);
+        final CSVFormat formatWithHeader = 
csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine);
+        final OutputStreamWriter streamWriter = new OutputStreamWriter(out);
+        printer = new CSVPrinter(streamWriter, formatWithHeader);
+
+        fieldValues = new Object[recordSchema.getFieldCount()];
     }
 
     private String getFormat(final RecordField field) {
@@ -69,60 +75,29 @@ public class WriteCSVResult implements RecordSetWriter {
     }
 
     @Override
-    public WriteResult write(final RecordSet rs, final OutputStream rawOut) 
throws IOException {
-        int count = 0;
-
-        final String[] columnNames = recordSchema.getFieldNames().toArray(new 
String[0]);
-        final CSVFormat formatWithHeader = 
csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine);
+    protected void onBeginRecordSet() throws IOException {
+        schemaWriter.writeHeader(recordSchema, getOutputStream());
+    }
 
-        schemaWriter.writeHeader(recordSchema, rawOut);
-
-        try (final OutputStream nonCloseable = new 
NonCloseableOutputStream(rawOut);
-            final OutputStreamWriter streamWriter = new 
OutputStreamWriter(nonCloseable);
-            final CSVPrinter printer = new CSVPrinter(streamWriter, 
formatWithHeader)) {
-
-            try {
-                Record record;
-                while ((record = rs.next()) != null) {
-                    final Object[] colVals = new 
Object[recordSchema.getFieldCount()];
-                    int i = 0;
-                    for (final RecordField recordField : 
recordSchema.getFields()) {
-                        colVals[i++] = record.getAsString(recordField, 
getFormat(recordField));
-                    }
-
-                    printer.printRecord(colVals);
-                    count++;
-                }
-            } catch (final Exception e) {
-                throw new IOException("Failed to serialize results", e);
-            }
-        }
+    @Override
+    protected Map<String, String> onFinishRecordSet() throws IOException {
+        return schemaWriter.getAttributes(recordSchema);
+    }
 
-        return WriteResult.of(count, schemaWriter.getAttributes(recordSchema));
+    @Override
+    public void close() throws IOException {
+        printer.close();
     }
 
     @Override
-    public WriteResult write(final Record record, final OutputStream rawOut) 
throws IOException {
-
-        try (final OutputStream nonCloseable = new 
NonCloseableOutputStream(rawOut);
-            final OutputStreamWriter streamWriter = new 
OutputStreamWriter(nonCloseable);
-            final CSVPrinter printer = new CSVPrinter(streamWriter, 
csvFormat)) {
-
-            try {
-                final RecordSchema schema = record.getSchema();
-                final Object[] colVals = new Object[schema.getFieldCount()];
-                int i = 0;
-                for (final RecordField recordField : schema.getFields()) {
-                    colVals[i++] = record.getAsString(recordField, 
getFormat(recordField));
-                }
-
-                printer.printRecord(colVals);
-            } catch (final Exception e) {
-                throw new IOException("Failed to serialize results", e);
-            }
+    public WriteResult write(final Record record) throws IOException {
+        int i = 0;
+        for (final RecordField recordField : recordSchema.getFields()) {
+            fieldValues[i++] = record.getAsString(recordField, 
getFormat(recordField));
         }
 
-        return WriteResult.of(1, Collections.emptyMap());
+        printer.printRecord(fieldValues);
+        return WriteResult.of(1, schemaWriter.getAttributes(recordSchema));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index 91370a7..9a722c1 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.json;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -26,6 +27,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
@@ -62,8 +64,8 @@ public class JsonRecordSetWriter extends 
DateTimeTextRecordSetWriter implements
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema) throws SchemaNotFoundException, IOException {
-        return new WriteJsonResult(logger, schema, 
getSchemaAccessWriter(schema), prettyPrint,
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws 
SchemaNotFoundException, IOException {
+        return new WriteJsonResult(logger, schema, 
getSchemaAccessWriter(schema), out, prettyPrint,
             getDateFormat().orElse(null), getTimeFormat().orElse(null), 
getTimestampFormat().orElse(null));
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index bd7dc5e..b73ecab 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -20,13 +20,13 @@ package org.apache.nifi.json;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigInteger;
-import java.sql.SQLException;
 import java.text.DateFormat;
 import java.util.Map;
 import java.util.Optional;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
@@ -34,87 +34,76 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.serialization.record.SerializedForm;
 import org.apache.nifi.serialization.record.type.ArrayDataType;
 import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.MapDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerationException;
 import org.codehaus.jackson.JsonGenerator;
 
-public class WriteJsonResult implements RecordSetWriter {
+public class WriteJsonResult extends AbstractRecordSetWriter implements 
RecordSetWriter {
     private final ComponentLog logger;
-    private final boolean prettyPrint;
     private final SchemaAccessWriter schemaAccess;
     private final RecordSchema recordSchema;
     private final JsonFactory factory = new JsonFactory();
     private final DateFormat dateFormat;
     private final DateFormat timeFormat;
     private final DateFormat timestampFormat;
+    private final JsonGenerator generator;
 
-    public WriteJsonResult(final ComponentLog logger, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaAccess, final boolean prettyPrint,
-        final String dateFormat, final String timeFormat, final String 
timestampFormat) {
+    public WriteJsonResult(final ComponentLog logger, final RecordSchema 
recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, 
final boolean prettyPrint,
+        final String dateFormat, final String timeFormat, final String 
timestampFormat) throws IOException {
 
+        super(out);
         this.logger = logger;
         this.recordSchema = recordSchema;
-        this.prettyPrint = prettyPrint;
         this.schemaAccess = schemaAccess;
 
         this.dateFormat = dateFormat == null ? null : 
DataTypeUtils.getDateFormat(dateFormat);
         this.timeFormat = timeFormat == null ? null : 
DataTypeUtils.getDateFormat(timeFormat);
         this.timestampFormat = timestampFormat == null ? null : 
DataTypeUtils.getDateFormat(timestampFormat);
-    }
 
-    @Override
-    public WriteResult write(final RecordSet rs, final OutputStream rawOut) 
throws IOException {
-        int count = 0;
+        this.generator = factory.createJsonGenerator(out);
+        if (prettyPrint) {
+            generator.useDefaultPrettyPrinter();
+        }
+    }
 
-        schemaAccess.writeHeader(recordSchema, rawOut);
 
-        try (final JsonGenerator generator = factory.createJsonGenerator(new 
NonCloseableOutputStream(rawOut))) {
-            if (prettyPrint) {
-                generator.useDefaultPrettyPrinter();
-            }
+    @Override
+    protected void onBeginRecordSet() throws IOException {
+        final OutputStream out = getOutputStream();
+        schemaAccess.writeHeader(recordSchema, out);
 
-            generator.writeStartArray();
+        generator.writeStartArray();
+    }
 
-            Record record;
-            while ((record = rs.next()) != null) {
-                count++;
-                writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject());
-            }
+    @Override
+    protected Map<String, String> onFinishRecordSet() throws IOException {
+        generator.writeEndArray();
+        return schemaAccess.getAttributes(recordSchema);
+    }
 
-            generator.writeEndArray();
-        } catch (final SQLException e) {
-            throw new IOException("Failed to serialize Result Set to stream", 
e);
+    @Override
+    public void close() throws IOException {
+        if (generator != null) {
+            generator.close();
         }
 
-        return WriteResult.of(count, schemaAccess.getAttributes(recordSchema));
+        super.close();
     }
 
     @Override
-    public WriteResult write(final Record record, final OutputStream rawOut) 
throws IOException {
-        schemaAccess.writeHeader(recordSchema, rawOut);
-
-        try (final JsonGenerator generator = factory.createJsonGenerator(new 
NonCloseableOutputStream(rawOut))) {
-            if (prettyPrint) {
-                generator.useDefaultPrettyPrinter();
-            }
-
-            writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject());
-        } catch (final SQLException e) {
-            throw new IOException("Failed to write records to stream", e);
-        }
-
+    public WriteResult write(final Record record) throws IOException {
+        writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject());
         return WriteResult.of(1, schemaAccess.getAttributes(recordSchema));
     }
 
     private void writeRecord(final Record record, final RecordSchema 
writeSchema, final JsonGenerator generator, final GeneratorTask startTask, 
final GeneratorTask endTask)
-        throws JsonGenerationException, IOException, SQLException {
+        throws JsonGenerationException, IOException {
 
         final Optional<SerializedForm> serializedForm = 
record.getSerializedForm();
         if (serializedForm.isPresent()) {
@@ -155,7 +144,7 @@ public class WriteJsonResult implements RecordSetWriter {
 
     @SuppressWarnings("unchecked")
     private void writeValue(final JsonGenerator generator, final Object value, 
final String fieldName, final DataType dataType, final boolean moreCols)
-        throws JsonGenerationException, IOException, SQLException {
+        throws JsonGenerationException, IOException {
         if (value == null) {
             generator.writeNull();
             return;
@@ -268,7 +257,7 @@ public class WriteJsonResult implements RecordSetWriter {
     }
 
     private void writeArray(final Object[] values, final String fieldName, 
final JsonGenerator generator, final DataType elementType)
-        throws JsonGenerationException, IOException, SQLException {
+        throws JsonGenerationException, IOException {
         generator.writeStartArray();
         for (int i = 0; i < values.length; i++) {
             final boolean moreEntries = i < values.length - 1;

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
index 4fcc3a2..e6a9054 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
@@ -19,6 +19,7 @@ package org.apache.nifi.text;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -81,8 +82,8 @@ public class FreeFormTextRecordSetWriter extends 
SchemaRegistryRecordSetWriter i
     }
 
     @Override
-    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema) {
-        return new FreeFormTextWriter(textValue, characterSet);
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final FlowFile flowFile, final OutputStream out) {
+        return new FreeFormTextWriter(textValue, characterSet, out);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
index 39416c8..95f2a73 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextWriter.java
@@ -27,42 +27,24 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
 
-public class FreeFormTextWriter implements RecordSetWriter {
+public class FreeFormTextWriter extends AbstractRecordSetWriter implements 
RecordSetWriter {
     private static final byte NEW_LINE = (byte) '\n';
     private final PropertyValue propertyValue;
     private final Charset charset;
+    private final OutputStream out;
 
-    public FreeFormTextWriter(final PropertyValue textPropertyValue, final 
Charset characterSet) {
-        propertyValue = textPropertyValue;
-        charset = characterSet;
-    }
-
-    @Override
-    public WriteResult write(final RecordSet recordSet, final OutputStream 
out) throws IOException {
-        int count = 0;
-
-        try {
-            Record record;
-            while ((record = recordSet.next()) != null) {
-                final RecordSchema schema = record.getSchema();
-                final List<String> colNames = getColumnNames(schema);
-
-                count++;
-                write(record, out, colNames);
-            }
-        } catch (final Exception e) {
-            throw new ProcessException(e);
-        }
-
-        return WriteResult.of(count, Collections.emptyMap());
+    public FreeFormTextWriter(final PropertyValue textPropertyValue, final 
Charset characterSet, final OutputStream out) {
+        super(out);
+        this.propertyValue = textPropertyValue;
+        this.charset = characterSet;
+        this.out = out;
     }
 
     private List<String> getColumnNames(final RecordSchema schema) {
@@ -78,7 +60,7 @@ public class FreeFormTextWriter implements RecordSetWriter {
     }
 
     @Override
-    public WriteResult write(final Record record, final OutputStream out) 
throws IOException {
+    public WriteResult write(final Record record) throws IOException {
         write(record, out, getColumnNames(record.getSchema()));
         return WriteResult.of(1, Collections.emptyMap());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 0a84aec..8c20ba7 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -27,6 +27,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigDecimal;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Time;
@@ -47,6 +48,7 @@ import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData.Array;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
@@ -60,7 +62,7 @@ import org.junit.Test;
 
 public abstract class TestWriteAvroResult {
 
-    protected abstract WriteAvroResult createWriter(Schema schema);
+    protected abstract RecordSetWriter createWriter(Schema schema, 
OutputStream out) throws IOException;
 
     protected abstract GenericRecord readRecord(InputStream in, Schema schema) 
throws IOException;
 
@@ -80,7 +82,7 @@ public abstract class TestWriteAvroResult {
     }
 
     private void testLogicalTypes(Schema schema) throws ParseException, 
IOException {
-        final WriteAvroResult writer = createWriter(schema);
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("timeMillis", 
RecordFieldType.TIME.getDataType()));
@@ -108,12 +110,12 @@ public abstract class TestWriteAvroResult {
         values.put("decimal", expectedDecimal.doubleValue());
         final Record record = new MapRecord(recordSchema, values);
 
-        final byte[] data;
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            writer.write(RecordSet.of(record.getSchema(), record), baos);
-            data = baos.toByteArray();
+        try (final RecordSetWriter writer = createWriter(schema, baos)) {
+            writer.write(RecordSet.of(record.getSchema(), record));
         }
 
+        final byte[] data = baos.toByteArray();
+
         try (final InputStream in = new ByteArrayInputStream(data)) {
             final GenericRecord avroRecord = readRecord(in, schema);
             final long secondsSinceMidnight = 33 + (20 * 60) + (14 * 60 * 60);
@@ -138,7 +140,7 @@ public abstract class TestWriteAvroResult {
     @Test
     public void testDataTypes() throws IOException {
         final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/avro/datatypes.avsc"));
-        final WriteAvroResult writer = createWriter(schema);
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
         final List<RecordField> subRecordFields = 
Collections.singletonList(new RecordField("field1", 
RecordFieldType.STRING.getDataType()));
         final RecordSchema subRecordSchema = new 
SimpleRecordSchema(subRecordFields);
@@ -178,13 +180,14 @@ public abstract class TestWriteAvroResult {
 
         final Record record = new MapRecord(recordSchema, values);
 
-        final byte[] data;
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            final WriteResult writeResult = 
writer.write(RecordSet.of(record.getSchema(), record), baos);
-            verify(writeResult);
-            data = baos.toByteArray();
+        final WriteResult writeResult;
+        try (final RecordSetWriter writer = createWriter(schema, baos)) {
+            writeResult = writer.write(RecordSet.of(record.getSchema(), 
record));
         }
 
+        verify(writeResult);
+        final byte[] data = baos.toByteArray();
+
         try (final InputStream in = new ByteArrayInputStream(data)) {
             final GenericRecord avroRecord = readRecord(in, schema);
             assertMatch(record, avroRecord);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
index 6ace012..9761076 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
@@ -19,6 +19,7 @@ package org.apache.nifi.avro;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
@@ -26,12 +27,13 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.StringType;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.serialization.RecordSetWriter;
 
 public class TestWriteAvroResultWithSchema extends TestWriteAvroResult {
 
     @Override
-    protected WriteAvroResult createWriter(final Schema schema) {
-        return new WriteAvroResultWithSchema(schema);
+    protected RecordSetWriter createWriter(final Schema schema, final 
OutputStream out) throws IOException {
+        return new WriteAvroResultWithSchema(schema, out);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
index d40bb55..c25f0aa 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
@@ -19,6 +19,7 @@ package org.apache.nifi.avro;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Map;
 
 import org.apache.avro.Schema;
@@ -27,14 +28,15 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.nifi.schema.access.SchemaTextAsAttribute;
+import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.junit.Assert;
 
 public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
 
     @Override
-    protected WriteAvroResult createWriter(final Schema schema) {
-        return new WriteAvroResultWithExternalSchema(schema, 
AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute());
+    protected RecordSetWriter createWriter(final Schema schema, final 
OutputStream out) throws IOException {
+        return new WriteAvroResultWithExternalSchema(schema, 
AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute(), out);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
index 9424e79..c447664 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestWriteCSVResult.java
@@ -71,37 +71,38 @@ public class TestWriteCSVResult {
         }
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        final WriteCSVResult result = new WriteCSVResult(csvFormat, schema, 
new SchemaNameAsAttribute(),
-            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true);
-
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         final long now = System.currentTimeMillis();
-        final Map<String, Object> valueMap = new HashMap<>();
-        valueMap.put("string", "string");
-        valueMap.put("boolean", true);
-        valueMap.put("byte", (byte) 1);
-        valueMap.put("char", 'c');
-        valueMap.put("short", (short) 8);
-        valueMap.put("int", 9);
-        valueMap.put("bigint", BigInteger.valueOf(8L));
-        valueMap.put("long", 8L);
-        valueMap.put("float", 8.0F);
-        valueMap.put("double", 8.0D);
-        valueMap.put("date", new Date(now));
-        valueMap.put("time", new Time(now));
-        valueMap.put("timestamp", new Timestamp(now));
-        valueMap.put("record", null);
-        valueMap.put("choice", 48L);
-        valueMap.put("array", null);
-
-        final Record record = new MapRecord(schema, valueMap);
-        final RecordSet rs = RecordSet.of(schema, record);
-
-        final String output;
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            result.write(rs, baos);
-            output = new String(baos.toByteArray(), StandardCharsets.UTF_8);
+
+        try (final WriteCSVResult result = new WriteCSVResult(csvFormat, 
schema, new SchemaNameAsAttribute(), baos,
+            RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), true)) {
+
+            final Map<String, Object> valueMap = new HashMap<>();
+            valueMap.put("string", "string");
+            valueMap.put("boolean", true);
+            valueMap.put("byte", (byte) 1);
+            valueMap.put("char", 'c');
+            valueMap.put("short", (short) 8);
+            valueMap.put("int", 9);
+            valueMap.put("bigint", BigInteger.valueOf(8L));
+            valueMap.put("long", 8L);
+            valueMap.put("float", 8.0F);
+            valueMap.put("double", 8.0D);
+            valueMap.put("date", new Date(now));
+            valueMap.put("time", new Time(now));
+            valueMap.put("timestamp", new Timestamp(now));
+            valueMap.put("record", null);
+            valueMap.put("choice", 48L);
+            valueMap.put("array", null);
+
+            final Record record = new MapRecord(schema, valueMap);
+            final RecordSet rs = RecordSet.of(schema, record);
+
+            result.write(rs);
         }
 
+        final String output = new String(baos.toByteArray(), 
StandardCharsets.UTF_8);
+
         headerBuilder.deleteCharAt(headerBuilder.length() - 1);
         final String headerLine = headerBuilder.toString();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index 1acc496..16d2012 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -72,9 +72,7 @@ public class TestWriteJsonResult {
         }
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), true, RecordFieldType.DATE.getDefaultFormat(),
-            RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
-
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         final DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
         df.setTimeZone(TimeZone.getTimeZone("gmt"));
         final long time = df.parse("2017/01/01 17:00:00.000").getTime();
@@ -105,12 +103,14 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, valueMap);
         final RecordSet rs = RecordSet.of(schema, record);
 
-        final String output;
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            writer.write(rs, baos);
-            output = baos.toString();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(),
+            RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+
+            writer.write(rs);
         }
 
+        final String output = baos.toString();
+
         final String expected = new 
String(Files.readAllBytes(Paths.get("src/test/resources/json/output/dataTypes.json")));
         assertEquals(expected, output);
     }
@@ -139,15 +139,15 @@ public class TestWriteJsonResult {
 
         final RecordSet rs = RecordSet.of(schema, record1, record2);
 
-        final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), true, RecordFieldType.DATE.getDefaultFormat(),
-            RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(),
+            RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat())) {
 
-        final byte[] data;
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            writer.write(rs, baos);
-            data = baos.toByteArray();
+            writer.write(rs);
         }
 
+        final byte[] data = baos.toByteArray();
+
         final String expected = "[ " + serialized1 + ", " + serialized2 + " ]";
 
         final String output = new String(data, StandardCharsets.UTF_8);
@@ -171,14 +171,13 @@ public class TestWriteJsonResult {
         final Record record = new MapRecord(schema, values);
         final RecordSet rs = RecordSet.of(schema, record);
 
-        final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), false, null, null, null);
-
-        final byte[] data;
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            writer.write(rs, baos);
-            data = baos.toByteArray();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (final WriteJsonResult writer = new 
WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new 
SchemaNameAsAttribute(), baos, false, null, null, null)) {
+            writer.write(rs);
         }
 
+        final byte[] data = baos.toByteArray();
+
         final String expected = 
"[{\"timestamp\":37293723,\"time\":37293723,\"date\":37293723}]";
 
         final String output = new String(data, StandardCharsets.UTF_8);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9bd0246a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
index ba1b490..3105d8d 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
@@ -68,6 +68,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record</artifactId>
             <scope>compile</scope>
         </dependency>

Reply via email to