http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
new file mode 100644
index 0000000..31d0ec8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
+@EventDriven
+@InputRequirement(Requirement.INPUT_ALLOWED)
+@Tags({"sql", "select", "jdbc", "query", "database", "record"})
+@CapabilityDescription("Executes provided SQL select query. Query result will 
be converted to the format specified by a Record Writer. "
+        + "Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
+        + "a timer, or cron expression, using the standard scheduling methods, 
or it can be triggered by an incoming FlowFile. "
+        + "If it is triggered by an incoming FlowFile, then attributes of that 
FlowFile will be available when evaluating the "
+        + "select query, and the query may use the ? to escape parameters. In 
this case, the parameters to use must exist as FlowFile attributes "
+        + "with the naming convention sql.args.N.type and sql.args.N.value, 
where N is a positive integer. The sql.args.N.type is expected to be "
+        + "a number indicating the JDBC Type. The content of the FlowFile is 
expected to be in UTF-8 format. "
+        + "FlowFile attribute 'executesql.row.count' indicates how many rows 
were selected.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "sql.args.N.type", description = "Incoming 
FlowFiles are expected to be parametrized SQL statements. The type of each 
Parameter is specified as an integer "
+                + "that represents the JDBC Type of the parameter."),
+        @ReadsAttribute(attribute = "sql.args.N.value", description = 
"Incoming FlowFiles are expected to be parametrized SQL statements. The value 
of the Parameters are specified as "
+                + "sql.args.1.value, sql.args.2.value, sql.args.3.value, and 
so on. The type of the sql.args.1.value Parameter is specified by the 
sql.args.1.type attribute."),
+        @ReadsAttribute(attribute = "sql.args.N.format", description = "This 
attribute is always optional, but default options may not always work for your 
data. "
+                + "Incoming FlowFiles are expected to be parametrized SQL 
statements. In some cases "
+                + "a format option needs to be specified, currently this is 
only applicable for binary data types, dates, times and timestamps. Binary Data 
Types (defaults to 'ascii') - "
+                + "ascii: each string character in your attribute value 
represents a single byte. This is the format provided by Avro Processors. "
+                + "base64: the string is a Base64 encoded string that can be 
decoded to bytes. "
+                + "hex: the string is hex encoded with all letters in upper 
case and no '0x' at the beginning. "
+                + "Dates/Times/Timestamps - "
+                + "Date, Time and Timestamp formats all support both custom 
formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') "
+                + "as specified according to 
java.time.format.DateTimeFormatter. "
+                + "If not specified, a long value input is expected to be an 
unix epoch (milli seconds from 1970/1/1), or a string value in "
+                + "'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some 
database engines e.g. Derby or MySQL do not support milliseconds and will 
truncate milliseconds), "
+                + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
+})
+@WritesAttributes({
+        @WritesAttribute(attribute = "executesql.row.count", description = 
"Contains the number of rows returned in the select query"),
+        @WritesAttribute(attribute = "executesql.query.duration", description 
= "Combined duration of the query execution time and fetch time in 
milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.executiontime", 
description = "Duration of the query execution time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.fetchtime", description 
= "Duration of the result set fetch time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.resultset.index", description 
= "Assuming multiple result sets are returned, "
+                + "the zero based index of this result set."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Rows Per Flow File' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Rows Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, 
then this "
+                + "attribute will not be populated."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Rows Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order  "
+                + "FlowFiles were produced"),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records output by the Record Writer.")
+})
+public class ExecuteSQLRecord extends AbstractExecuteSQL {
+
+
+    public static final PropertyDescriptor RECORD_WRITER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("esqlrecord-record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing 
results to a FlowFile. The Record Writer may use Inherit Schema to emulate the 
inferred schema behavior, i.e. "
+                    + "an explicit schema need not be defined in the writer, 
and will be supplied by the same logic used to infer the schema from the column 
types.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor NORMALIZE_NAMES = new 
PropertyDescriptor.Builder()
+            .name("esqlrecord-normalize")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change characters in column names. For 
example, colons and periods will be changed to underscores.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public ExecuteSQLRecord() {
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        r.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(DBCP_SERVICE);
+        pds.add(SQL_SELECT_QUERY);
+        pds.add(QUERY_TIMEOUT);
+        pds.add(RECORD_WRITER_FACTORY);
+        pds.add(NORMALIZE_NAMES);
+        pds.add(USE_AVRO_LOGICAL_TYPES);
+        pds.add(MAX_ROWS_PER_FLOW_FILE);
+        pds.add(OUTPUT_BATCH_SIZE);
+        propDescriptors = Collections.unmodifiableList(pds);
+    }
+
+    @Override
+    protected SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context, FlowFile fileToProcess) {
+        final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final boolean convertNamesForAvro = 
context.getProperty(NORMALIZE_NAMES).asBoolean();
+        final Boolean useAvroLogicalTypes = 
context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+        final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .build();
+        final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        return new RecordSqlWriter(recordSetWriterFactory, options, 
maxRowsPerFlowFile, fileToProcess == null ? Collections.emptyMap() : 
fileToProcess.getAttributes());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
index 1923e2c..71348ef 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -27,49 +26,21 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateManager;
-import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.dbcp.DBCPService;
-import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.util.StopWatch;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.IntStream;
 
 import static 
org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
 import static 
org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@@ -112,60 +83,7 @@ import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
 @DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = 
"Initial maximum value for the specified column",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, 
description = "Specifies an initial max value for max value column(s). 
Properties should "
         + "be added in the format `initial.maxvalue.<max_value_column>`. This 
value is only used the first time the table is accessed (when a Maximum Value 
Column is specified).")
-public class QueryDatabaseTable extends AbstractDatabaseFetchProcessor {
-
-    public static final String RESULT_TABLENAME = "tablename";
-    public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
-
-    public static final String FRAGMENT_ID = 
FragmentAttributes.FRAGMENT_ID.key();
-    public static final String FRAGMENT_INDEX = 
FragmentAttributes.FRAGMENT_INDEX.key();
-
-    public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Fetch Size")
-            .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the database driver and may not be "
-                    + "honored and/or exact. If the value specified is zero, 
then the hint is ignored.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
-            .name("qdbt-max-rows")
-            .displayName("Max Rows Per Flow File")
-            .description("The maximum number of result rows that will be 
included in a single FlowFile. This will allow you to break up very large "
-                    + "result sets into multiple FlowFiles. If the value 
specified is zero, then all rows are returned in a single FlowFile.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("qdbt-output-batch-size")
-            .displayName("Output Batch Size")
-            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all result set rows "
-                    + "have been processed and the output FlowFiles are ready 
for transfer to the downstream relationship. For large result sets, this can 
cause a large burst of FlowFiles "
-                    + "to be transferred at the end of processor execution. If 
this property is set, then when the specified number of FlowFiles are ready for 
transfer, then the session will "
-                    + "be committed, thus releasing the FlowFiles to the 
downstream relationship. NOTE: The maxvalue.* and fragment.count attributes 
will not be set on FlowFiles when this "
-                    + "property is set.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor MAX_FRAGMENTS = new 
PropertyDescriptor.Builder()
-            .name("qdbt-max-frags")
-            .displayName("Maximum Number of Fragments")
-            .description("The maximum number of fragments. If the value 
specified is zero, then all fragments are returned. " +
-                    "This prevents OutOfMemoryError when this processor 
ingests huge table. NOTE: Setting this property can result in data loss, as the 
incoming results are "
-                    + "not ordered, and fragments may end at arbitrary 
boundaries where rows are not included in the result set.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
+public class QueryDatabaseTable extends AbstractQueryDatabaseTable {
 
     public QueryDatabaseTable() {
         final Set<Relationship> r = new HashSet<>();
@@ -197,365 +115,22 @@ public class QueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
     }
 
     @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propDescriptors;
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .name(propertyDescriptorName)
-                .required(false)
-                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
-                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
-                
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-                .dynamic(true)
-                .build();
-    }
-
-    @OnScheduled
-    public void setup(final ProcessContext context) {
-        maxValueProperties = getDefaultMaxValueProperties(context, null);
-    }
-
-    @OnStopped
-    public void stop() {
-        // Reset the column type map in case properties change
-        setupComplete.set(false);
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
-        // Fetch the column/table info once
-        if (!setupComplete.get()) {
-            super.setup(context);
-        }
-        ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
-
-        final ComponentLog logger = getLogger();
-
-        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+    protected SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context) {
         final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
-        final String columnNames = 
context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
-        final String sqlQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
-        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
-        final String customWhereClause = 
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
-        final Integer fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final boolean convertNamesForAvro = 
context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
+        final Boolean useAvroLogicalTypes = 
context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
         final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
-        final Integer outputBatchSizeField = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final int outputBatchSize = outputBatchSizeField == null ? 0 : 
outputBatchSizeField;
-        final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
-                ? 
context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
-                : 0;
+        final Integer defaultPrecision = 
context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger();
+        final Integer defaultScale = 
context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger();
+
         final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
                 .recordName(tableName)
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .defaultPrecision(defaultPrecision)
+                .defaultScale(defaultScale)
                 .maxRows(maxRowsPerFlowFile)
-                
.convertNames(context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean())
-                
.useLogicalTypes(context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean())
-                
.defaultPrecision(context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions().asInteger())
-                
.defaultScale(context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions().asInteger())
                 .build();
-
-        final StateManager stateManager = context.getStateManager();
-        final StateMap stateMap;
-
-        try {
-            stateMap = stateManager.getState(Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to retrieve observed maximum values from 
the State Manager. Will not perform "
-                    + "query until this is accomplished.", ioe);
-            context.yield();
-            return;
-        }
-        // Make a mutable copy of the current state property map. This will be 
updated by the result row callback, and eventually
-        // set as the current state map (after the session has been committed)
-        final Map<String, String> statePropertyMap = new 
HashMap<>(stateMap.toMap());
-
-        //If an initial max value for column(s) has been specified using 
properties, and this column is not in the state manager, sync them to the state 
property map
-        for (final Map.Entry<String, String> maxProp : 
maxValueProperties.entrySet()) {
-            String maxPropKey = maxProp.getKey().toLowerCase();
-            String fullyQualifiedMaxPropKey = getStateKey(tableName, 
maxPropKey, dbAdapter);
-            if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
-                String newMaxPropValue;
-                // If we can't find the value at the fully-qualified key name, 
it is possible (under a previous scheme)
-                // the value has been stored under a key that is only the 
column name. Fall back to check the column name,
-                // but store the new initial max value under the 
fully-qualified key.
-                if (statePropertyMap.containsKey(maxPropKey)) {
-                    newMaxPropValue = statePropertyMap.get(maxPropKey);
-                } else {
-                    newMaxPropValue = maxProp.getValue();
-                }
-                statePropertyMap.put(fullyQualifiedMaxPropKey, 
newMaxPropValue);
-
-            }
-        }
-
-        List<String> maxValueColumnNameList = 
StringUtils.isEmpty(maxValueColumnNames)
-                ? null
-                : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
-        final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, 
columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
-        final StopWatch stopWatch = new StopWatch(true);
-        final String fragmentIdentifier = UUID.randomUUID().toString();
-
-        try (final Connection con = 
dbcpService.getConnection(Collections.emptyMap());
-             final Statement st = con.createStatement()) {
-
-            if (fetchSize != null && fetchSize > 0) {
-                try {
-                    st.setFetchSize(fetchSize);
-                } catch (SQLException se) {
-                    // Not all drivers support this, just log the error (at 
debug level) and move on
-                    logger.debug("Cannot set fetch size to {} due to {}", new 
Object[]{fetchSize, se.getLocalizedMessage()}, se);
-                }
-            }
-
-            String jdbcURL = "DBCPService";
-            try {
-                DatabaseMetaData databaseMetaData = con.getMetaData();
-                if (databaseMetaData != null) {
-                    jdbcURL = databaseMetaData.getURL();
-                }
-            } catch (SQLException se) {
-                // Ignore and use default JDBC URL. This shouldn't happen 
unless the driver doesn't implement getMetaData() properly
-            }
-
-            final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
-            st.setQueryTimeout(queryTimeout); // timeout in seconds
-            if (logger.isDebugEnabled()) {
-                logger.debug("Executing query {}", new Object[] { selectQuery 
});
-            }
-            try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
-                int fragmentIndex=0;
-                // Max values will be updated in the state property map by the 
callback
-                final MaxValueResultSetRowCollector maxValCollector = new 
MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
-
-                while(true) {
-                    final AtomicLong nrOfRows = new AtomicLong(0L);
-
-                    FlowFile fileToProcess = session.create();
-                    try {
-                        fileToProcess = session.write(fileToProcess, out -> {
-                            try {
-                                
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, 
maxValCollector));
-                            } catch (SQLException | RuntimeException e) {
-                                throw new ProcessException("Error during 
database query or conversion of records to Avro.", e);
-                            }
-                        });
-                    } catch (ProcessException e) {
-                        // Add flowfile to results before rethrowing so it 
will be removed from session in outer catch
-                        resultSetFlowFiles.add(fileToProcess);
-                        throw e;
-                    }
-
-                    if (nrOfRows.get() > 0) {
-                        // set attribute how many rows were selected
-                        fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
-                        fileToProcess = session.putAttribute(fileToProcess, 
RESULT_TABLENAME, tableName);
-                        fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                        if(maxRowsPerFlowFile > 0) {
-                            fileToProcess = 
session.putAttribute(fileToProcess, FRAGMENT_ID, fragmentIdentifier);
-                            fileToProcess = 
session.putAttribute(fileToProcess, FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
-                        }
-
-                        logger.info("{} contains {} Avro records; transferring 
to 'success'",
-                                new Object[]{fileToProcess, nrOfRows.get()});
-
-                        session.getProvenanceReporter().receive(fileToProcess, 
jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-                        resultSetFlowFiles.add(fileToProcess);
-                        // If we've reached the batch size, send out the flow 
files
-                        if (outputBatchSize > 0 && resultSetFlowFiles.size() 
>= outputBatchSize) {
-                            session.transfer(resultSetFlowFiles, REL_SUCCESS);
-                            session.commit();
-                            resultSetFlowFiles.clear();
-                        }
-                    } else {
-                        // If there were no rows returned, don't send the 
flowfile
-                        session.remove(fileToProcess);
-                        // If no rows and this was first FlowFile, yield
-                        if(fragmentIndex == 0){
-                            context.yield();
-                        }
-                        break;
-                    }
-
-                    fragmentIndex++;
-                    if (maxFragments > 0 && fragmentIndex >= maxFragments) {
-                        break;
-                    }
-
-                    // If we aren't splitting up the data into flow files or 
fragments, then the result set has been entirely fetched so don't loop back 
around
-                    if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
-                        break;
-                    }
-
-                    // If we are splitting up the data into flow files, don't 
loop back around if we've gotten all results
-                    if(maxRowsPerFlowFile > 0 && nrOfRows.get() < 
maxRowsPerFlowFile) {
-                        break;
-                    }
-                }
-
-                // Apply state changes from the Max Value tracker
-                maxValCollector.applyStateChanges();
-
-                // Even though the maximum value and total count are known at 
this point, to maintain consistent behavior if Output Batch Size is set, do not 
store the attributes
-                if (outputBatchSize == 0) {
-                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
-                        // Add maximum values as attributes
-                        for (Map.Entry<String, String> entry : 
statePropertyMap.entrySet()) {
-                            // Get just the column name from the key
-                            String key = entry.getKey();
-                            String colName = 
key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + 
NAMESPACE_DELIMITER.length());
-                            resultSetFlowFiles.set(i, 
session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, 
entry.getValue()));
-                        }
-
-                        //set count on all FlowFiles
-                        if (maxRowsPerFlowFile > 0) {
-                            resultSetFlowFiles.set(i,
-                                    
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", 
Integer.toString(fragmentIndex)));
-                        }
-                    }
-                }
-            } catch (final SQLException e) {
-                throw e;
-            }
-
-            session.transfer(resultSetFlowFiles, REL_SUCCESS);
-
-        } catch (final ProcessException | SQLException e) {
-            logger.error("Unable to execute SQL select query {} due to {}", 
new Object[]{selectQuery, e});
-            if (!resultSetFlowFiles.isEmpty()) {
-                session.remove(resultSetFlowFiles);
-            }
-            context.yield();
-        } finally {
-            session.commit();
-            try {
-                // Update the state
-                stateManager.setState(statePropertyMap, Scope.CLUSTER);
-            } catch (IOException ioe) {
-                getLogger().error("{} failed to update State Manager, maximum 
observed values will not be recorded", new Object[]{this, ioe});
-            }
-        }
-    }
-
-    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, 
String columnNames, List<String> maxValColumnNames,
-                              String customWhereClause, Map<String, String> 
stateMap) {
-
-        return getQuery(dbAdapter, tableName, null, columnNames, 
maxValColumnNames, customWhereClause, stateMap);
-    }
-
-    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, 
String sqlQuery, String columnNames, List<String> maxValColumnNames,
-                              String customWhereClause, Map<String, String> 
stateMap) {
-        if (StringUtils.isEmpty(tableName)) {
-            throw new IllegalArgumentException("Table name must be specified");
-        }
-        final StringBuilder query;
-
-        if (StringUtils.isEmpty(sqlQuery)) {
-            query = new StringBuilder(dbAdapter.getSelectStatement(tableName, 
columnNames, null, null, null, null));
-        } else {
-            query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
-        }
-
-        List<String> whereClauses = new ArrayList<>();
-        // Check state map for last max values
-        if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != 
null) {
-            IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
-                String colName = maxValColumnNames.get(index);
-                String maxValueKey = getStateKey(tableName, colName, 
dbAdapter);
-                String maxValue = stateMap.get(maxValueKey);
-                if (StringUtils.isEmpty(maxValue)) {
-                    // If we can't find the value at the fully-qualified key 
name, it is possible (under a previous scheme)
-                    // the value has been stored under a key that is only the 
column name. Fall back to check the column name; either way, when a new
-                    // maximum value is observed, it will be stored under the 
fully-qualified key from then on.
-                    maxValue = stateMap.get(colName.toLowerCase());
-                }
-                if (!StringUtils.isEmpty(maxValue)) {
-                    Integer type = columnTypeMap.get(maxValueKey);
-                    if (type == null) {
-                        // This shouldn't happen as we are populating 
columnTypeMap when the processor is scheduled.
-                        throw new IllegalArgumentException("No column type 
found for: " + colName);
-                    }
-                    // Add a condition for the WHERE clause
-                    whereClauses.add(colName + (index == 0 ? " > " : " >= ") + 
getLiteralByType(type, maxValue, dbAdapter.getName()));
-                }
-            });
-        }
-
-        if (customWhereClause != null) {
-            whereClauses.add("(" + customWhereClause + ")");
-        }
-
-        if (!whereClauses.isEmpty()) {
-            query.append(" WHERE ");
-            query.append(StringUtils.join(whereClauses, " AND "));
-        }
-
-        return query.toString();
-    }
-
-    protected class MaxValueResultSetRowCollector implements 
JdbcCommon.ResultSetRowCallback {
-        DatabaseAdapter dbAdapter;
-        final Map<String, String> newColMap;
-        final Map<String, String> originalState;
-        String tableName;
-
-        public MaxValueResultSetRowCollector(String tableName, Map<String, 
String> stateMap, DatabaseAdapter dbAdapter) {
-            this.dbAdapter = dbAdapter;
-            this.originalState = stateMap;
-
-            this.newColMap = new HashMap<>();
-            this.newColMap.putAll(stateMap);
-
-            this.tableName = tableName;
-        }
-
-        @Override
-        public void processRow(ResultSet resultSet) throws IOException {
-            if (resultSet == null) {
-                return;
-            }
-            try {
-                // Iterate over the row, check-and-set max values
-                final ResultSetMetaData meta = resultSet.getMetaData();
-                final int nrOfColumns = meta.getColumnCount();
-                if (nrOfColumns > 0) {
-                    for (int i = 1; i <= nrOfColumns; i++) {
-                        String colName = meta.getColumnName(i).toLowerCase();
-                        String fullyQualifiedMaxValueKey = 
getStateKey(tableName, colName, dbAdapter);
-                        Integer type = 
columnTypeMap.get(fullyQualifiedMaxValueKey);
-                        // Skip any columns we're not keeping track of or 
whose value is null
-                        if (type == null || resultSet.getObject(i) == null) {
-                            continue;
-                        }
-                        String maxValueString = 
newColMap.get(fullyQualifiedMaxValueKey);
-                        // If we can't find the value at the fully-qualified 
key name, it is possible (under a previous scheme)
-                        // the value has been stored under a key that is only 
the column name. Fall back to check the column name; either way, when a new
-                        // maximum value is observed, it will be stored under 
the fully-qualified key from then on.
-                        if (StringUtils.isEmpty(maxValueString)) {
-                            maxValueString = newColMap.get(colName);
-                        }
-                        String newMaxValueString = 
getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
-                        if (newMaxValueString != null) {
-                            newColMap.put(fullyQualifiedMaxValueKey, 
newMaxValueString);
-                        }
-                    }
-                }
-            } catch (ParseException | SQLException e) {
-                throw new IOException(e);
-            }
-        }
-
-        @Override
-        public void applyStateChanges() {
-            this.originalState.putAll(this.newColMap);
-        }
+        return new DefaultAvroSqlWriter(options);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
new file mode 100644
index 0000000..ea89256
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTableRecord.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processors.standard.sql.RecordSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGICAL_TYPES;
+
+
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "select", "jdbc", "query", "database", "record"})
+@SeeAlso({GenerateTableFetch.class, ExecuteSQL.class})
+@CapabilityDescription("Generates a SQL select query, or uses a provided 
statement, and executes it to fetch all rows whose values in the specified "
+        + "Maximum Value column(s) are larger than the "
+        + "previously-seen maxima. Query result will be converted to the 
format specified by the record writer. Expression Language is supported for 
several properties, but no incoming "
+        + "connections are permitted. The Variable Registry may be used to 
provide values for any property containing Expression Language. If it is 
desired to "
+        + "leverage flow file attributes to perform these queries, the 
GenerateTableFetch and/or ExecuteSQL processors can be used for this purpose. "
+        + "Streaming is used so arbitrarily large result sets are supported. 
This processor can be scheduled to run on "
+        + "a timer or cron expression, using the standard scheduling methods. 
This processor is intended to be run on the Primary Node only. FlowFile 
attribute "
+        + "'querydbtable.row.count' indicates how many rows were selected.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on 
the specified table, the maximum values for "
+        + "the specified column(s) will be retained for use in future 
executions of the query. This allows the Processor "
+        + "to fetch only those records that have max values greater than the 
retained values. This can be used for "
+        + "incremental fetching, fetching of newly added rows, etc. To clear 
the maximum values, clear the state of the processor "
+        + "per the State Management documentation")
+@WritesAttributes({
+        @WritesAttribute(attribute = "tablename", description="Name of the 
table being queried"),
+        @WritesAttribute(attribute = "querydbtable.row.count", 
description="The number of rows selected by the query"),
+        @WritesAttribute(attribute="fragment.identifier", description="If 'Max 
Rows Per Flow File' is set then all FlowFiles from the same query result set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Rows Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, 
then this "
+                + "attribute will not be populated."),
+        @WritesAttribute(attribute="fragment.index", description="If 'Max Rows 
Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order  "
+                + "FlowFiles were produced"),
+        @WritesAttribute(attribute = "maxvalue.*", description = "Each 
attribute contains the observed maximum value of a specified 'Maximum-value 
Column'. The "
+                + "suffix of the attribute is the name of the column. If 
Output Batch Size is set, then this attribute will not be populated."),
+        @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer."),
+        @WritesAttribute(attribute = "record.count", description = "The number 
of records output by the Record Writer.")
+})
+@DynamicProperty(name = "initial.maxvalue.<max_value_column>", value = 
"Initial maximum value for the specified column",
+        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, 
description = "Specifies an initial max value for max value column(s). 
Properties should "
+        + "be added in the format `initial.maxvalue.<max_value_column>`. This 
value is only used the first time the table is accessed (when a Maximum Value 
Column is specified).")
+public class QueryDatabaseTableRecord extends AbstractQueryDatabaseTable {
+
+    public static final PropertyDescriptor RECORD_WRITER_FACTORY = new 
PropertyDescriptor.Builder()
+            .name("qdbtr-record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing 
results to a FlowFile. The Record Writer may use Inherit Schema to emulate the 
inferred schema behavior, i.e. "
+                    + "an explicit schema need not be defined in the writer, 
and will be supplied by the same logic used to infer the schema from the column 
types.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor NORMALIZE_NAMES = new 
PropertyDescriptor.Builder()
+            .name("qdbtr-normalize")
+            .displayName("Normalize Table/Column Names")
+            .description("Whether to change characters in column names when 
creating the output schema. For example, colons and periods will be changed to 
underscores.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public QueryDatabaseTableRecord() {
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(DBCP_SERVICE);
+        pds.add(DB_TYPE);
+        pds.add(new PropertyDescriptor.Builder()
+                .fromPropertyDescriptor(TABLE_NAME)
+                .description("The name of the database table to be queried. 
When a custom query is used, this property is used to alias the query and 
appears as an attribute on the FlowFile.")
+                .build());
+        pds.add(COLUMN_NAMES);
+        pds.add(WHERE_CLAUSE);
+        pds.add(SQL_QUERY);
+        pds.add(RECORD_WRITER_FACTORY);
+        pds.add(MAX_VALUE_COLUMN_NAMES);
+        pds.add(QUERY_TIMEOUT);
+        pds.add(FETCH_SIZE);
+        pds.add(MAX_ROWS_PER_FLOW_FILE);
+        pds.add(OUTPUT_BATCH_SIZE);
+        pds.add(MAX_FRAGMENTS);
+        pds.add(NORMALIZE_NAMES);
+        pds.add(USE_AVRO_LOGICAL_TYPES);
+
+        propDescriptors = Collections.unmodifiableList(pds);
+    }
+
+    @Override
+    protected SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context) {
+        final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final boolean convertNamesForAvro = 
context.getProperty(NORMALIZE_NAMES).asBoolean();
+        final Boolean useAvroLogicalTypes = 
context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
+        final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .build();
+        final RecordSetWriterFactory recordSetWriterFactory = 
context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+
+        return new RecordSqlWriter(recordSetWriterFactory, options, 
maxRowsPerFlowFile, Collections.emptyMap());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
new file mode 100644
index 0000000..574aca7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/DefaultAvroSqlWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultAvroSqlWriter implements SqlWriter {
+
+    private final JdbcCommon.AvroConversionOptions options;
+
+    private final Map<String,String> attributesToAdd = new 
HashMap<String,String>() {{
+        put(CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
+    }};
+
+    public DefaultAvroSqlWriter(JdbcCommon.AvroConversionOptions options) {
+        this.options = options;
+    }
+
+    @Override
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, 
ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector 
callback) throws Exception {
+        try {
+            return JdbcCommon.convertToAvroStream(resultSet, outputStream, 
options, callback);
+        } catch (SQLException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    @Override
+    public Map<String, String> getAttributesToAdd() {
+        return attributesToAdd;
+    }
+
+    @Override
+    public void writeEmptyResultSet(OutputStream outputStream, ComponentLog 
logger) throws IOException {
+        JdbcCommon.createEmptyAvroStream(outputStream);
+    }
+
+    @Override
+    public String getMimeType() {
+        return JdbcCommon.MIME_TYPE_AVRO_BINARY;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
new file mode 100644
index 0000000..c1a76b4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/RecordSqlWriter.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class RecordSqlWriter implements SqlWriter {
+
+    private final RecordSetWriterFactory recordSetWriterFactory;
+    private final AtomicReference<WriteResult> writeResultRef;
+    private final JdbcCommon.AvroConversionOptions options;
+    private final int maxRowsPerFlowFile;
+    private final Map<String, String> originalAttributes;
+    private ResultSetRecordSet fullRecordSet;
+    private RecordSchema writeSchema;
+    private String mimeType;
+
+    public RecordSqlWriter(RecordSetWriterFactory recordSetWriterFactory, 
JdbcCommon.AvroConversionOptions options, int maxRowsPerFlowFile, Map<String, 
String> originalAttributes) {
+        this.recordSetWriterFactory = recordSetWriterFactory;
+        this.writeResultRef = new AtomicReference<>();
+        this.maxRowsPerFlowFile = maxRowsPerFlowFile;
+        this.options = options;
+        this.originalAttributes = originalAttributes;
+    }
+
+    @Override
+    public long writeResultSet(ResultSet resultSet, OutputStream outputStream, 
ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector 
callback) throws Exception {
+        final RecordSet recordSet;
+        try {
+            if (fullRecordSet == null) {
+                final Schema avroSchema = JdbcCommon.createSchema(resultSet, 
options);
+                final RecordSchema recordAvroSchema = 
AvroTypeUtil.createSchema(avroSchema);
+                fullRecordSet = new ResultSetRecordSetWithCallback(resultSet, 
recordAvroSchema, callback);
+                writeSchema = 
recordSetWriterFactory.getSchema(originalAttributes, fullRecordSet.getSchema());
+            }
+            recordSet = (maxRowsPerFlowFile > 0) ? 
fullRecordSet.limit(maxRowsPerFlowFile) : fullRecordSet;
+
+        } catch (final SQLException | SchemaNotFoundException | IOException e) 
{
+            throw new ProcessException(e);
+        }
+        try (final RecordSetWriter resultSetWriter = 
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
+            writeResultRef.set(resultSetWriter.write(recordSet));
+            if (mimeType == null) {
+                mimeType = resultSetWriter.getMimeType();
+            }
+            return writeResultRef.get().getRecordCount();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public Map<String, String> getAttributesToAdd() {
+        Map<String, String> attributesToAdd = new HashMap<>();
+        attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), mimeType);
+
+        // Add any attributes from the record writer (if present)
+        final WriteResult result = writeResultRef.get();
+        if (result != null) {
+            if (result.getAttributes() != null) {
+                attributesToAdd.putAll(result.getAttributes());
+            }
+
+            attributesToAdd.put("record.count", 
String.valueOf(result.getRecordCount()));
+        }
+        return attributesToAdd;
+    }
+
+    @Override
+    public void updateCounters(ProcessSession session) {
+        final WriteResult result = writeResultRef.get();
+        if (result != null) {
+            session.adjustCounter("Records Written", result.getRecordCount(), 
false);
+        }
+    }
+
+    @Override
+    public void writeEmptyResultSet(OutputStream outputStream, ComponentLog 
logger) throws IOException {
+        try (final RecordSetWriter resultSetWriter = 
recordSetWriterFactory.createWriter(logger, writeSchema, outputStream)) {
+            mimeType = resultSetWriter.getMimeType();
+            resultSetWriter.beginRecordSet();
+            resultSetWriter.finishRecordSet();
+        } catch (final Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public String getMimeType() {
+        return mimeType;
+    }
+
+    private static class ResultSetRecordSetWithCallback extends 
ResultSetRecordSet {
+
+        private final AbstractQueryDatabaseTable.MaxValueResultSetRowCollector 
callback;
+
+        ResultSetRecordSetWithCallback(ResultSet rs, RecordSchema 
readerSchema, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector 
callback) throws SQLException {
+            super(rs, readerSchema);
+            this.callback = callback;
+        }
+
+        @Override
+        public Record next() throws IOException {
+            try {
+                if (hasMoreRows()) {
+                    ResultSet rs = getResultSet();
+                    final Record record = createRecord(rs);
+                    if (callback != null) {
+                        callback.processRow(rs);
+                    }
+                    setMoreRows(rs.next());
+                    return record;
+                } else {
+                    return null;
+                }
+            } catch (final SQLException e) {
+                throw new IOException("Could not obtain next record from 
ResultSet", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
new file mode 100644
index 0000000..08fc3fd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/sql/SqlWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.sql;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.AbstractQueryDatabaseTable;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.ResultSet;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * The SqlWriter interface provides a standard way for processors such as 
ExecuteSQL, ExecuteSQLRecord, QueryDatabaseTable, and QueryDatabaseTableRecord
+ * to write SQL result sets out to a flow file in whichever manner is 
appropriate. For example, ExecuteSQL writes the result set as Avro but 
ExecuteSQLRecord
+ * uses the Record API to write the result set out as prescribed by the 
selected RecordSetWriter.
+ */
+public interface SqlWriter {
+
+    /**
+     * Writes the given result set out to the given output stream, possibly 
applying a callback as each row is processed.
+     * @param resultSet the ResultSet to be written
+     * @param outputStream the OutputStream to write the result set to
+     * @param logger a common logger that can be used to log messages during 
write
+     * @param callback a MaxValueResultSetRowCollector that may be called as 
each row in the ResultSet is processed
+     * @return the number of rows written to the output stream
+     * @throws Exception if any errors occur during the writing of the result 
set to the output stream
+     */
+    long writeResultSet(ResultSet resultSet, OutputStream outputStream, 
ComponentLog logger, AbstractQueryDatabaseTable.MaxValueResultSetRowCollector 
callback) throws Exception;
+
+    /**
+     * Returns a map of attribute key/value pairs to be added to any outgoing 
flow file(s). The default implementation is to return an empty map.
+     * @return a map of attribute key/value pairs
+     */
+    default Map<String, String> getAttributesToAdd() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Updates any session counters as a result of processing result sets. The 
default implementation is empty, no counters will be updated.
+     * @param session the session upon which to update counters
+     */
+    default void updateCounters(ProcessSession session) {
+    }
+
+    /**
+     * Writes an empty result set to the output stream. In some cases a 
ResultSet might not have any viable rows, but will throw an error or
+     * behave unexpectedly if rows are attempted to be retrieved. This method 
indicates the implementation should write whatever output is
+     * appropriate for a result set with no rows.
+     * @param outputStream the OutputStream to write the empty result set to
+     * @param logger a common logger that can be used to log messages during 
write
+     * @throws IOException if any errors occur during the writing of an empty 
result set to the output stream
+     */
+    void writeEmptyResultSet(OutputStream outputStream, ComponentLog logger) 
throws IOException;
+
+    /**
+     * Returns the MIME type of the output format. This can be used in 
FlowFile attributes or to perform format-specific processing as necessary.
+     * @return the MIME type string of the output format.
+     */
+    String getMimeType();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index d21b7f4..bfe1403 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -35,6 +35,7 @@ org.apache.nifi.processors.standard.EvaluateXPath
 org.apache.nifi.processors.standard.EvaluateXQuery
 org.apache.nifi.processors.standard.ExecuteProcess
 org.apache.nifi.processors.standard.ExecuteSQL
+org.apache.nifi.processors.standard.ExecuteSQLRecord
 org.apache.nifi.processors.standard.ExecuteStreamCommand
 org.apache.nifi.processors.standard.ExtractGrok
 org.apache.nifi.processors.standard.ExtractText
@@ -96,6 +97,7 @@ org.apache.nifi.processors.standard.PutSyslog
 org.apache.nifi.processors.standard.PutTCP
 org.apache.nifi.processors.standard.PutUDP
 org.apache.nifi.processors.standard.QueryDatabaseTable
+org.apache.nifi.processors.standard.QueryDatabaseTableRecord
 org.apache.nifi.processors.standard.QueryRecord
 org.apache.nifi.processors.standard.ReplaceText
 org.apache.nifi.processors.standard.ReplaceTextWithMapping

Reply via email to