NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors

Signed-off-by: Pierre Villard <[email protected]>

This closes #2945.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c6572f04
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c6572f04
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c6572f04

Branch: refs/heads/master
Commit: c6572f042bf1637f6faaa2b2ffe4a56e297c6d1a
Parents: b4810b8
Author: Matthew Burgess <[email protected]>
Authored: Fri Aug 10 16:49:25 2018 -0400
Committer: Pierre Villard <[email protected]>
Committed: Sat Oct 6 10:53:11 2018 +0200

----------------------------------------------------------------------
 .../record/ResultSetRecordSet.java              |   15 +-
 .../processors/standard/AbstractExecuteSQL.java |  369 +++++
 .../standard/AbstractQueryDatabaseTable.java    |  483 +++++++
 .../nifi/processors/standard/ExecuteSQL.java    |  371 +----
 .../processors/standard/ExecuteSQLRecord.java   |  147 ++
 .../processors/standard/QueryDatabaseTable.java |  453 +-----
 .../standard/QueryDatabaseTableRecord.java      |  148 ++
 .../standard/sql/DefaultAvroSqlWriter.java      |   67 +
 .../standard/sql/RecordSqlWriter.java           |  158 +++
 .../nifi/processors/standard/sql/SqlWriter.java |   77 +
 .../org.apache.nifi.processor.Processor         |    2 +
 .../standard/QueryDatabaseTableRecordTest.java  | 1332 ++++++++++++++++++
 .../standard/QueryDatabaseTableTest.java        |    2 +-
 .../processors/standard/TestExecuteSQL.java     |   44 +-
 .../standard/TestExecuteSQLRecord.java          |  376 +++++
 15 files changed, 3261 insertions(+), 783 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index 551789c..bf7d224 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -63,6 +63,19 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
         return schema;
     }
 
+    // Protected methods for subclasses to access private member variables
+    protected ResultSet getResultSet() {
+        return rs;
+    }
+
+    protected boolean hasMoreRows() {
+        return moreRows;
+    }
+
+    protected void setMoreRows(boolean moreRows) {
+        this.moreRows = moreRows;
+    }
+
     @Override
     public Record next() throws IOException {
         try {
@@ -87,7 +100,7 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
         }
     }
 
-    private Record createRecord(final ResultSet rs) throws SQLException {
+    protected Record createRecord(final ResultSet rs) throws SQLException {
         final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
 
         for (final RecordField field : schema.getFields()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
new file mode 100644
index 0000000..bf46549
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public abstract class AbstractExecuteSQL extends AbstractProcessor {
+
+    public static final String RESULT_ROW_COUNT = "executesql.row.count";
+    public static final String RESULT_QUERY_DURATION = 
"executesql.query.duration";
+    public static final String RESULT_QUERY_EXECUTION_TIME = 
"executesql.query.executiontime";
+    public static final String RESULT_QUERY_FETCH_TIME = 
"executesql.query.fetchtime";
+    public static final String RESULTSET_INDEX = "executesql.resultset.index";
+
+    public static final String FRAGMENT_ID = 
FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = 
FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = 
FragmentAttributes.FRAGMENT_COUNT.key();
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully created FlowFile from SQL query result 
set.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("SQL query execution failed. Incoming FlowFile will 
be penalized and routed to this relationship")
+            .build();
+    protected Set<Relationship> relationships;
+
+    public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("Database Connection Pooling Service")
+            .description("The Controller Service that is used to obtain 
connection to database")
+            .required(true)
+            .identifiesControllerService(DBCPService.class)
+            .build();
+
+    public static final PropertyDescriptor SQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
+            .name("SQL select query")
+            .description("The SQL select query to execute. The query can be 
empty, a constant value, or built from attributes "
+                    + "using Expression Language. If this property is 
specified, it will be used regardless of the content of "
+                    + "incoming flowfiles. If this property is empty, the 
content of the incoming flow file is expected "
+                    + "to contain a valid SQL select query, to be issued by 
the processor to the database. Note that Expression "
+                    + "Language is not evaluated for flow file contents.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Max Wait Time")
+            .description("The maximum amount of time allowed for a running SQL 
select query "
+                    + " , zero means there is no limit. Max time less than 1 
second will be equal to zero.")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
+            .name("esql-max-rows")
+            .displayName("Max Rows Per Flow File")
+            .description("The maximum number of result rows that will be 
included in a single FlowFile. This will allow you to break up very large "
+                    + "result sets into multiple FlowFiles. If the value 
specified is zero, then all rows are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("esql-output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all result set rows "
+                    + "have been processed and the output FlowFiles are ready 
for transfer to the downstream relationship. For large result sets, this can 
cause a large burst of FlowFiles "
+                    + "to be transferred at the end of processor execution. If 
this property is set, then when the specified number of FlowFiles are ready for 
transfer, then the session will "
+                    + "be committed, thus releasing the FlowFiles to the 
downstream relationship. NOTE: The fragment.count attribute will not be set on 
FlowFiles when this "
+                    + "property is set.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    protected List<PropertyDescriptor> propDescriptors;
+
+    protected DBCPService dbcpService;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    @OnScheduled
+    public void setup(ProcessContext context) {
+        // If the query is not set, then an incoming flow file is needed. 
Otherwise fail the initialization
+        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && 
!context.hasIncomingConnection()) {
+            final String errorString = "Either the Select Query must be 
specified or there must be an incoming connection "
+                    + "providing flowfile(s) containing a SQL select query";
+            getLogger().error(errorString);
+            throw new ProcessException(errorString);
+        }
+        dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile fileToProcess = null;
+        if (context.hasIncomingConnection()) {
+            fileToProcess = session.get();
+
+            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
+            // However, if we have no FlowFile and we have connections coming 
from other Processors, then
+            // we know that we should run only if we have a FlowFile.
+            if (fileToProcess == null && context.hasNonLoopConnection()) {
+                return;
+            }
+        }
+
+        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+        final ComponentLog logger = getLogger();
+        final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+        final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSizeField = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final int outputBatchSize = outputBatchSizeField == null ? 0 : 
outputBatchSizeField;
+
+        SqlWriter sqlWriter = configureSqlWriter(session, context, 
fileToProcess);
+
+        final String selectQuery;
+        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
+            selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+        } else {
+            // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
+            // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
+            final StringBuilder queryContents = new StringBuilder();
+            session.read(fileToProcess, in -> 
queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
+            selectQuery = queryContents.toString();
+        }
+
+        int resultCount = 0;
+        try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes());
+             final PreparedStatement st = con.prepareStatement(selectQuery)) {
+            st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+            if (fileToProcess != null) {
+                JdbcCommon.setParameters(st, fileToProcess.getAttributes());
+            }
+            logger.debug("Executing query {}", new Object[]{selectQuery});
+
+            int fragmentIndex = 0;
+            final String fragmentId = UUID.randomUUID().toString();
+
+            final StopWatch executionTime = new StopWatch(true);
+
+            boolean hasResults = st.execute();
+
+            long executionTimeElapsed = 
executionTime.getElapsed(TimeUnit.MILLISECONDS);
+
+            boolean hasUpdateCount = st.getUpdateCount() != -1;
+
+            while (hasResults || hasUpdateCount) {
+                //getMoreResults() and execute() return false to indicate that 
the result of the statement is just a number and not a ResultSet
+                if (hasResults) {
+                    final AtomicLong nrOfRows = new AtomicLong(0L);
+
+                    try {
+                        final ResultSet resultSet = st.getResultSet();
+                        do {
+                            final StopWatch fetchTime = new StopWatch(true);
+
+                            FlowFile resultSetFF;
+                            if (fileToProcess == null) {
+                                resultSetFF = session.create();
+                            } else {
+                                resultSetFF = session.create(fileToProcess);
+                                resultSetFF = 
session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+                            }
+
+                            try {
+                                resultSetFF = session.write(resultSetFF, out 
-> {
+                                    try {
+                                        
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
+                                    } catch (Exception e) {
+                                        throw (e instanceof ProcessException) 
? (ProcessException) e : new ProcessException(e);
+                                    }
+                                });
+
+                                long fetchTimeElapsed = 
fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+                                // set attributes
+                                final Map<String, String> attributesToAdd = 
new HashMap<>();
+                                attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
+                                attributesToAdd.put(RESULT_QUERY_DURATION, 
String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+                                
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
+                                attributesToAdd.put(RESULT_QUERY_FETCH_TIME, 
String.valueOf(fetchTimeElapsed));
+                                attributesToAdd.put(RESULTSET_INDEX, 
String.valueOf(resultCount));
+                                
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+                                resultSetFF = 
session.putAllAttributes(resultSetFF, attributesToAdd);
+                                sqlWriter.updateCounters(session);
+
+                                // if fragmented ResultSet, determine if we 
should keep this fragment; set fragment attributes
+                                if (maxRowsPerFlowFile > 0) {
+                                    // if row count is zero and this is not 
the first fragment, drop it instead of committing it.
+                                    if (nrOfRows.get() == 0 && fragmentIndex > 
0) {
+                                        session.remove(resultSetFF);
+                                        break;
+                                    }
+
+                                    resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+                                    resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
+                                }
+
+                                logger.info("{} contains {} records; 
transferring to 'success'",
+                                        new Object[]{resultSetFF, 
nrOfRows.get()});
+                                // Report a FETCH event if there was an 
incoming flow file, or a RECEIVE event otherwise
+                                if(context.hasIncomingConnection()) {
+                                    
session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                                } else {
+                                    
session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
+                                }
+                                resultSetFlowFiles.add(resultSetFF);
+
+                                // If we've reached the batch size, send out 
the flow files
+                                if (outputBatchSize > 0 && 
resultSetFlowFiles.size() >= outputBatchSize) {
+                                    session.transfer(resultSetFlowFiles, 
REL_SUCCESS);
+                                    session.commit();
+                                    resultSetFlowFiles.clear();
+                                }
+
+                                fragmentIndex++;
+                            } catch (Exception e) {
+                                // Remove the result set flow file and 
propagate the exception
+                                session.remove(resultSetFF);
+                                if (e instanceof ProcessException) {
+                                    throw (ProcessException) e;
+                                } else {
+                                    throw new ProcessException(e);
+                                }
+                            }
+                        } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == 
maxRowsPerFlowFile);
+
+                        // If we are splitting results but not outputting 
batches, set count on all FlowFiles
+                        if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
+                            for (int i = 0; i < resultSetFlowFiles.size(); 
i++) {
+                                resultSetFlowFiles.set(i,
+                                        
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, 
Integer.toString(fragmentIndex)));
+                            }
+                        }
+                    } catch (final SQLException e) {
+                        throw new ProcessException(e);
+                    }
+
+                    resultCount++;
+                }
+
+                // are there anymore result sets?
+                try {
+                    hasResults = 
st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
+                    hasUpdateCount = st.getUpdateCount() != -1;
+                } catch (SQLException ex) {
+                    hasResults = false;
+                    hasUpdateCount = false;
+                }
+            }
+
+            // Transfer any remaining files to SUCCESS
+            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+            resultSetFlowFiles.clear();
+
+            //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
+            //  pass the original flow file down the line to trigger 
downstream processors
+            if (fileToProcess != null) {
+                if (resultCount > 0) {
+                    session.remove(fileToProcess);
+                } else {
+                    fileToProcess = session.write(fileToProcess, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
+                    fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, "0");
+                    fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+                    session.transfer(fileToProcess, REL_SUCCESS);
+                }
+            } else if (resultCount == 0) {
+                //If we had no inbound FlowFile, no exceptions, and the SQL 
generated no result sets (Insert/Update/Delete statements only)
+                // Then generate an empty Output FlowFile
+                FlowFile resultSetFF = session.create();
+
+                resultSetFF = session.write(resultSetFF, out -> 
sqlWriter.writeEmptyResultSet(out, getLogger()));
+                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, "0");
+                resultSetFF = session.putAttribute(resultSetFF, 
CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType());
+                session.transfer(resultSetFF, REL_SUCCESS);
+            }
+        } catch (final ProcessException | SQLException e) {
+            //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
+            //  pass the original flow file down the line to trigger 
downstream processors
+            if (fileToProcess == null) {
+                // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
+                logger.error("Unable to execute SQL select query {} due to {}. 
No FlowFile to route to failure",
+                        new Object[]{selectQuery, e});
+                context.yield();
+            } else {
+                if (context.hasIncomingConnection()) {
+                    logger.error("Unable to execute SQL select query {} for {} 
due to {}; routing to failure",
+                            new Object[]{selectQuery, fileToProcess, e});
+                    fileToProcess = session.penalize(fileToProcess);
+                } else {
+                    logger.error("Unable to execute SQL select query {} due to 
{}; routing to failure",
+                            new Object[]{selectQuery, e});
+                    context.yield();
+                }
+                session.transfer(fileToProcess, REL_FAILURE);
+            }
+        }
+    }
+
+    protected abstract SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context, FlowFile fileToProcess);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
new file mode 100644
index 0000000..06df6c1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.db.DatabaseAdapter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+
+public abstract class AbstractQueryDatabaseTable extends 
AbstractDatabaseFetchProcessor {
+
+    public static final String RESULT_TABLENAME = "tablename";
+    public static final String RESULT_ROW_COUNT = "querydbtable.row.count";
+
+    public static final String FRAGMENT_ID = 
FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = 
FragmentAttributes.FRAGMENT_INDEX.key();
+
+    public static final PropertyDescriptor FETCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Fetch Size")
+            .description("The number of result rows to be fetched from the 
result set at a time. This is a hint to the database driver and may not be "
+                    + "honored and/or exact. If the value specified is zero, 
then the hint is ignored.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
+            .name("qdbt-max-rows")
+            .displayName("Max Rows Per Flow File")
+            .description("The maximum number of result rows that will be 
included in a single FlowFile. This will allow you to break up very large "
+                    + "result sets into multiple FlowFiles. If the value 
specified is zero, then all rows are returned in a single FlowFile.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("qdbt-output-batch-size")
+            .displayName("Output Batch Size")
+            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all result set rows "
+                    + "have been processed and the output FlowFiles are ready 
for transfer to the downstream relationship. For large result sets, this can 
cause a large burst of FlowFiles "
+                    + "to be transferred at the end of processor execution. If 
this property is set, then when the specified number of FlowFiles are ready for 
transfer, then the session will "
+                    + "be committed, thus releasing the FlowFiles to the 
downstream relationship. NOTE: The maxvalue.* and fragment.count attributes 
will not be set on FlowFiles when this "
+                    + "property is set.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor MAX_FRAGMENTS = new 
PropertyDescriptor.Builder()
+            .name("qdbt-max-frags")
+            .displayName("Maximum Number of Fragments")
+            .description("The maximum number of fragments. If the value 
specified is zero, then all fragments are returned. " +
+                    "This prevents OutOfMemoryError when this processor 
ingests huge table. NOTE: Setting this property can result in data loss, as the 
incoming results are "
+                    + "not ordered, and fragments may end at arbitrary 
boundaries where rows are not included in the result set.")
+            .defaultValue("0")
+            .required(true)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .required(false)
+                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING,
 true))
+                
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+                .dynamic(true)
+                .build();
+    }
+
+    @OnScheduled
+    public void setup(final ProcessContext context) {
+        maxValueProperties = getDefaultMaxValueProperties(context, null);
+    }
+
+    @OnStopped
+    public void stop() {
+        // Reset the column type map in case properties change
+        setupComplete.set(false);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        // Fetch the column/table info once
+        if (!setupComplete.get()) {
+            super.setup(context);
+        }
+        ProcessSession session = sessionFactory.createSession();
+        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+        final ComponentLog logger = getLogger();
+
+        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+        final DatabaseAdapter dbAdapter = 
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
+        final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+        final String columnNames = 
context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String sqlQuery = 
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
+        final String maxValueColumnNames = 
context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue();
+        final String customWhereClause = 
context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue();
+        final Integer fetchSize = 
context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+        final Integer outputBatchSizeField = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final int outputBatchSize = outputBatchSizeField == null ? 0 : 
outputBatchSizeField;
+        final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet()
+                ? 
context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger()
+                : 0;
+
+
+        SqlWriter sqlWriter = configureSqlWriter(session, context);
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to retrieve observed maximum values from 
the State Manager. Will not perform "
+                    + "query until this is accomplished.", ioe);
+            context.yield();
+            return;
+        }
+        // Make a mutable copy of the current state property map. This will be 
updated by the result row callback, and eventually
+        // set as the current state map (after the session has been committed)
+        final Map<String, String> statePropertyMap = new 
HashMap<>(stateMap.toMap());
+
+        //If an initial max value for column(s) has been specified using 
properties, and this column is not in the state manager, sync them to the state 
property map
+        for (final Map.Entry<String, String> maxProp : 
maxValueProperties.entrySet()) {
+            String maxPropKey = maxProp.getKey().toLowerCase();
+            String fullyQualifiedMaxPropKey = getStateKey(tableName, 
maxPropKey, dbAdapter);
+            if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) {
+                String newMaxPropValue;
+                // If we can't find the value at the fully-qualified key name, 
it is possible (under a previous scheme)
+                // the value has been stored under a key that is only the 
column name. Fall back to check the column name,
+                // but store the new initial max value under the 
fully-qualified key.
+                if (statePropertyMap.containsKey(maxPropKey)) {
+                    newMaxPropValue = statePropertyMap.get(maxPropKey);
+                } else {
+                    newMaxPropValue = maxProp.getValue();
+                }
+                statePropertyMap.put(fullyQualifiedMaxPropKey, 
newMaxPropValue);
+
+            }
+        }
+
+        List<String> maxValueColumnNameList = 
StringUtils.isEmpty(maxValueColumnNames)
+                ? null
+                : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*"));
+        final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, 
columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap);
+        final StopWatch stopWatch = new StopWatch(true);
+        final String fragmentIdentifier = UUID.randomUUID().toString();
+
+        try (final Connection con = 
dbcpService.getConnection(Collections.emptyMap());
+             final Statement st = con.createStatement()) {
+
+            if (fetchSize != null && fetchSize > 0) {
+                try {
+                    st.setFetchSize(fetchSize);
+                } catch (SQLException se) {
+                    // Not all drivers support this, just log the error (at 
debug level) and move on
+                    logger.debug("Cannot set fetch size to {} due to {}", new 
Object[]{fetchSize, se.getLocalizedMessage()}, se);
+                }
+            }
+
+            String jdbcURL = "DBCPService";
+            try {
+                DatabaseMetaData databaseMetaData = con.getMetaData();
+                if (databaseMetaData != null) {
+                    jdbcURL = databaseMetaData.getURL();
+                }
+            } catch (SQLException se) {
+                // Ignore and use default JDBC URL. This shouldn't happen 
unless the driver doesn't implement getMetaData() properly
+            }
+
+            final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue();
+            st.setQueryTimeout(queryTimeout); // timeout in seconds
+            if (logger.isDebugEnabled()) {
+                logger.debug("Executing query {}", new Object[] { selectQuery 
});
+            }
+            try (final ResultSet resultSet = st.executeQuery(selectQuery)) {
+                int fragmentIndex=0;
+                // Max values will be updated in the state property map by the 
callback
+                final MaxValueResultSetRowCollector maxValCollector = new 
MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter);
+
+                while(true) {
+                    final AtomicLong nrOfRows = new AtomicLong(0L);
+
+                    FlowFile fileToProcess = session.create();
+                    try {
+                        fileToProcess = session.write(fileToProcess, out -> {
+                            try {
+                                
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), 
maxValCollector));
+                            } catch (Exception e) {
+                                throw new ProcessException("Error during 
database query or conversion of records.", e);
+                            }
+                        });
+                    } catch (ProcessException e) {
+                        // Add flowfile to results before rethrowing so it 
will be removed from session in outer catch
+                        resultSetFlowFiles.add(fileToProcess);
+                        throw e;
+                    }
+
+                    if (nrOfRows.get() > 0) {
+                        // set attributes
+                        final Map<String, String> attributesToAdd = new 
HashMap<>();
+                        attributesToAdd.put(RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
+                        attributesToAdd.put(RESULT_TABLENAME, tableName);
+
+                        if(maxRowsPerFlowFile > 0) {
+                            attributesToAdd.put(FRAGMENT_ID, 
fragmentIdentifier);
+                            attributesToAdd.put(FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
+                        }
+
+                        attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+                        fileToProcess = 
session.putAllAttributes(fileToProcess, attributesToAdd);
+                        sqlWriter.updateCounters(session);
+
+                        logger.info("{} contains {} records; transferring to 
'success'",
+                                new Object[]{fileToProcess, nrOfRows.get()});
+
+                        session.getProvenanceReporter().receive(fileToProcess, 
jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                        resultSetFlowFiles.add(fileToProcess);
+                        // If we've reached the batch size, send out the flow 
files
+                        if (outputBatchSize > 0 && resultSetFlowFiles.size() 
>= outputBatchSize) {
+                            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+                            session.commit();
+                            resultSetFlowFiles.clear();
+                        }
+                    } else {
+                        // If there were no rows returned, don't send the 
flowfile
+                        session.remove(fileToProcess);
+                        // If no rows and this was first FlowFile, yield
+                        if(fragmentIndex == 0){
+                            context.yield();
+                        }
+                        break;
+                    }
+
+                    fragmentIndex++;
+                    if (maxFragments > 0 && fragmentIndex >= maxFragments) {
+                        break;
+                    }
+
+                    // If we aren't splitting up the data into flow files or 
fragments, then the result set has been entirely fetched so don't loop back 
around
+                    if (maxFragments == 0 && maxRowsPerFlowFile == 0) {
+                        break;
+                    }
+
+                    // If we are splitting up the data into flow files, don't 
loop back around if we've gotten all results
+                    if(maxRowsPerFlowFile > 0 && nrOfRows.get() < 
maxRowsPerFlowFile) {
+                        break;
+                    }
+                }
+
+                // Apply state changes from the Max Value tracker
+                maxValCollector.applyStateChanges();
+
+                // Even though the maximum value and total count are known at 
this point, to maintain consistent behavior if Output Batch Size is set, do not 
store the attributes
+                if (outputBatchSize == 0) {
+                    for (int i = 0; i < resultSetFlowFiles.size(); i++) {
+                        // Add maximum values as attributes
+                        for (Map.Entry<String, String> entry : 
statePropertyMap.entrySet()) {
+                            // Get just the column name from the key
+                            String key = entry.getKey();
+                            String colName = 
key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + 
NAMESPACE_DELIMITER.length());
+                            resultSetFlowFiles.set(i, 
session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, 
entry.getValue()));
+                        }
+
+                        //set count on all FlowFiles
+                        if (maxRowsPerFlowFile > 0) {
+                            resultSetFlowFiles.set(i,
+                                    
session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", 
Integer.toString(fragmentIndex)));
+                        }
+                    }
+                }
+            } catch (final SQLException e) {
+                throw e;
+            }
+
+            session.transfer(resultSetFlowFiles, REL_SUCCESS);
+
+        } catch (final ProcessException | SQLException e) {
+            logger.error("Unable to execute SQL select query {} due to {}", 
new Object[]{selectQuery, e});
+            if (!resultSetFlowFiles.isEmpty()) {
+                session.remove(resultSetFlowFiles);
+            }
+            context.yield();
+        } finally {
+            session.commit();
+            try {
+                // Update the state
+                stateManager.setState(statePropertyMap, Scope.CLUSTER);
+            } catch (IOException ioe) {
+                getLogger().error("{} failed to update State Manager, maximum 
observed values will not be recorded", new Object[]{this, ioe});
+            }
+        }
+    }
+
+    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, 
String columnNames, List<String> maxValColumnNames,
+                              String customWhereClause, Map<String, String> 
stateMap) {
+
+        return getQuery(dbAdapter, tableName, null, columnNames, 
maxValColumnNames, customWhereClause, stateMap);
+    }
+
+    protected String getQuery(DatabaseAdapter dbAdapter, String tableName, 
String sqlQuery, String columnNames, List<String> maxValColumnNames,
+                              String customWhereClause, Map<String, String> 
stateMap) {
+        if (StringUtils.isEmpty(tableName)) {
+            throw new IllegalArgumentException("Table name must be specified");
+        }
+        final StringBuilder query;
+
+        if (StringUtils.isEmpty(sqlQuery)) {
+            query = new StringBuilder(dbAdapter.getSelectStatement(tableName, 
columnNames, null, null, null, null));
+        } else {
+            query = getWrappedQuery(dbAdapter, sqlQuery, tableName);
+        }
+
+        List<String> whereClauses = new ArrayList<>();
+        // Check state map for last max values
+        if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != 
null) {
+            IntStream.range(0, maxValColumnNames.size()).forEach((index) -> {
+                String colName = maxValColumnNames.get(index);
+                String maxValueKey = getStateKey(tableName, colName, 
dbAdapter);
+                String maxValue = stateMap.get(maxValueKey);
+                if (StringUtils.isEmpty(maxValue)) {
+                    // If we can't find the value at the fully-qualified key 
name, it is possible (under a previous scheme)
+                    // the value has been stored under a key that is only the 
column name. Fall back to check the column name; either way, when a new
+                    // maximum value is observed, it will be stored under the 
fully-qualified key from then on.
+                    maxValue = stateMap.get(colName.toLowerCase());
+                }
+                if (!StringUtils.isEmpty(maxValue)) {
+                    Integer type = columnTypeMap.get(maxValueKey);
+                    if (type == null) {
+                        // This shouldn't happen as we are populating 
columnTypeMap when the processor is scheduled.
+                        throw new IllegalArgumentException("No column type 
found for: " + colName);
+                    }
+                    // Add a condition for the WHERE clause
+                    whereClauses.add(colName + (index == 0 ? " > " : " >= ") + 
getLiteralByType(type, maxValue, dbAdapter.getName()));
+                }
+            });
+        }
+
+        if (customWhereClause != null) {
+            whereClauses.add("(" + customWhereClause + ")");
+        }
+
+        if (!whereClauses.isEmpty()) {
+            query.append(" WHERE ");
+            query.append(StringUtils.join(whereClauses, " AND "));
+        }
+
+        return query.toString();
+    }
+
+    public class MaxValueResultSetRowCollector implements 
JdbcCommon.ResultSetRowCallback {
+        DatabaseAdapter dbAdapter;
+        final Map<String, String> newColMap;
+        final Map<String, String> originalState;
+        String tableName;
+
+        public MaxValueResultSetRowCollector(String tableName, Map<String, 
String> stateMap, DatabaseAdapter dbAdapter) {
+            this.dbAdapter = dbAdapter;
+            this.originalState = stateMap;
+
+            this.newColMap = new HashMap<>();
+            this.newColMap.putAll(stateMap);
+
+            this.tableName = tableName;
+        }
+
+        @Override
+        public void processRow(ResultSet resultSet) throws IOException {
+            if (resultSet == null) {
+                return;
+            }
+            try {
+                // Iterate over the row, check-and-set max values
+                final ResultSetMetaData meta = resultSet.getMetaData();
+                final int nrOfColumns = meta.getColumnCount();
+                if (nrOfColumns > 0) {
+                    for (int i = 1; i <= nrOfColumns; i++) {
+                        String colName = meta.getColumnName(i).toLowerCase();
+                        String fullyQualifiedMaxValueKey = 
getStateKey(tableName, colName, dbAdapter);
+                        Integer type = 
columnTypeMap.get(fullyQualifiedMaxValueKey);
+                        // Skip any columns we're not keeping track of or 
whose value is null
+                        if (type == null || resultSet.getObject(i) == null) {
+                            continue;
+                        }
+                        String maxValueString = 
newColMap.get(fullyQualifiedMaxValueKey);
+                        // If we can't find the value at the fully-qualified 
key name, it is possible (under a previous scheme)
+                        // the value has been stored under a key that is only 
the column name. Fall back to check the column name; either way, when a new
+                        // maximum value is observed, it will be stored under 
the fully-qualified key from then on.
+                        if (StringUtils.isEmpty(maxValueString)) {
+                            maxValueString = newColMap.get(colName);
+                        }
+                        String newMaxValueString = 
getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName());
+                        if (newMaxValueString != null) {
+                            newColMap.put(fullyQualifiedMaxValueKey, 
newMaxValueString);
+                        }
+                    }
+                }
+            } catch (ParseException | SQLException e) {
+                throw new IOException(e);
+            }
+        }
+
+        @Override
+        public void applyStateChanges() {
+            this.originalState.putAll(this.newColMap);
+        }
+    }
+
+    protected abstract SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index df82e2e..cc6d508 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -16,22 +16,12 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.nio.charset.Charset;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -41,23 +31,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.dbcp.DBCPService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.flowfile.attributes.FragmentAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
+import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
 import org.apache.nifi.processors.standard.util.JdbcCommon;
-import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.processors.standard.util.AvroUtil.CodecType;
 
 import static 
org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION;
 import static 
org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE;
@@ -94,99 +77,24 @@ import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
                 + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")
 })
 @WritesAttributes({
-    @WritesAttribute(attribute="executesql.row.count", description = "Contains 
the number of rows returned in the select query"),
-    @WritesAttribute(attribute="executesql.query.duration", description = 
"Combined duration of the query execution time and fetch time in milliseconds"),
-    @WritesAttribute(attribute="executesql.query.executiontime", description = 
"Duration of the query execution time in milliseconds"),
-    @WritesAttribute(attribute="executesql.query.fetchtime", description = 
"Duration of the result set fetch time in milliseconds"),
-    @WritesAttribute(attribute="executesql.resultset.index", description = 
"Assuming multiple result sets are returned, "
-       + "the zero based index of this result set."),
-    @WritesAttribute(attribute="fragment.identifier", description="If 'Max 
Rows Per Flow File' is set then all FlowFiles from the same query result set "
-            + "will have the same value for the fragment.identifier attribute. 
This can then be used to correlate the results."),
-    @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows 
Per Flow File' is set then this is the total number of  "
-            + "FlowFiles produced by a single ResultSet. This can be used in 
conjunction with the "
-            + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, 
then this "
-            + "attribute will not be populated."),
-    @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per 
Flow File' is set then the position of this FlowFile in the list of "
-            + "outgoing FlowFiles that were all derived from the same result 
set FlowFile. This can be "
-            + "used in conjunction with the fragment.identifier attribute to 
know which FlowFiles originated from the same query result set and in what 
order  "
+        @WritesAttribute(attribute = "executesql.row.count", description = 
"Contains the number of rows returned in the select query"),
+        @WritesAttribute(attribute = "executesql.query.duration", description 
= "Combined duration of the query execution time and fetch time in 
milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.executiontime", 
description = "Duration of the query execution time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.query.fetchtime", description 
= "Duration of the result set fetch time in milliseconds"),
+        @WritesAttribute(attribute = "executesql.resultset.index", description 
= "Assuming multiple result sets are returned, "
+                + "the zero based index of this result set."),
+        @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Rows Per Flow File' is set then all FlowFiles from the same query result 
set "
+                + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
+        @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Rows Per Flow File' is set then this is the total number of  "
+                + "FlowFiles produced by a single ResultSet. This can be used 
in conjunction with the "
+                + "fragment.identifier attribute in order to know how many 
FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, 
then this "
+                + "attribute will not be populated."),
+        @WritesAttribute(attribute = "fragment.index", description = "If 'Max 
Rows Per Flow File' is set then the position of this FlowFile in the list of "
+                + "outgoing FlowFiles that were all derived from the same 
result set FlowFile. This can be "
+                + "used in conjunction with the fragment.identifier attribute 
to know which FlowFiles originated from the same query result set and in what 
order  "
                 + "FlowFiles were produced")
 })
-public class ExecuteSQL extends AbstractProcessor {
-
-    public static final String RESULT_ROW_COUNT = "executesql.row.count";
-    public static final String RESULT_QUERY_DURATION = 
"executesql.query.duration";
-    public static final String RESULT_QUERY_EXECUTION_TIME = 
"executesql.query.executiontime";
-    public static final String RESULT_QUERY_FETCH_TIME = 
"executesql.query.fetchtime";
-    public static final String RESULTSET_INDEX = "executesql.resultset.index";
-
-    public static final String FRAGMENT_ID = 
FragmentAttributes.FRAGMENT_ID.key();
-    public static final String FRAGMENT_INDEX = 
FragmentAttributes.FRAGMENT_INDEX.key();
-    public static final String FRAGMENT_COUNT = 
FragmentAttributes.FRAGMENT_COUNT.key();
-
-    // Relationships
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Successfully created FlowFile from SQL query result 
set.")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("SQL query execution failed. Incoming FlowFile will 
be penalized and routed to this relationship")
-            .build();
-    private final Set<Relationship> relationships;
-
-    public static final PropertyDescriptor DBCP_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("Database Connection Pooling Service")
-            .description("The Controller Service that is used to obtain 
connection to database")
-            .required(true)
-            .identifiesControllerService(DBCPService.class)
-            .build();
-
-    public static final PropertyDescriptor SQL_SELECT_QUERY = new 
PropertyDescriptor.Builder()
-            .name("SQL select query")
-            .description("The SQL select query to execute. The query can be 
empty, a constant value, or built from attributes "
-                    + "using Expression Language. If this property is 
specified, it will be used regardless of the content of "
-                    + "incoming flowfiles. If this property is empty, the 
content of the incoming flow file is expected "
-                    + "to contain a valid SQL select query, to be issued by 
the processor to the database. Note that Expression "
-                    + "Language is not evaluated for flow file contents.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor QUERY_TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("Max Wait Time")
-            .description("The maximum amount of time allowed for a running SQL 
select query "
-                    + " , zero means there is no limit. Max time less than 1 
second will be equal to zero.")
-            .defaultValue("0 seconds")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .sensitive(false)
-            .build();
-
-    public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new 
PropertyDescriptor.Builder()
-            .name("esql-max-rows")
-            .displayName("Max Rows Per Flow File")
-            .description("The maximum number of result rows that will be 
included in a single FlowFile. This will allow you to break up very large "
-                    + "result sets into multiple FlowFiles. If the value 
specified is zero, then all rows are returned in a single FlowFile.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-
-    public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("esql-output-batch-size")
-            .displayName("Output Batch Size")
-            .description("The number of output FlowFiles to queue before 
committing the process session. When set to zero, the session will be committed 
when all result set rows "
-                    + "have been processed and the output FlowFiles are ready 
for transfer to the downstream relationship. For large result sets, this can 
cause a large burst of FlowFiles "
-                    + "to be transferred at the end of processor execution. If 
this property is set, then when the specified number of FlowFiles are ready for 
transfer, then the session will "
-                    + "be committed, thus releasing the FlowFiles to the 
downstream relationship. NOTE: The fragment.count attribute will not be set on 
FlowFiles when this "
-                    + "property is set.")
-            .defaultValue("0")
-            .required(true)
-            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
+public class ExecuteSQL extends AbstractExecuteSQL {
 
     public static final PropertyDescriptor COMPRESSION_FORMAT = new 
PropertyDescriptor.Builder()
             .name("compression-format")
@@ -198,8 +106,6 @@ public class ExecuteSQL extends AbstractProcessor {
             .required(true)
             .build();
 
-    private final List<PropertyDescriptor> propDescriptors;
-
     public ExecuteSQL() {
         final Set<Relationship> r = new HashSet<>();
         r.add(REL_SUCCESS);
@@ -212,248 +118,31 @@ public class ExecuteSQL extends AbstractProcessor {
         pds.add(QUERY_TIMEOUT);
         pds.add(NORMALIZE_NAMES_FOR_AVRO);
         pds.add(USE_AVRO_LOGICAL_TYPES);
+        pds.add(COMPRESSION_FORMAT);
         pds.add(DEFAULT_PRECISION);
         pds.add(DEFAULT_SCALE);
         pds.add(MAX_ROWS_PER_FLOW_FILE);
         pds.add(OUTPUT_BATCH_SIZE);
-        pds.add(COMPRESSION_FORMAT);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
     @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return propDescriptors;
-    }
-
-    @OnScheduled
-    public void setup(ProcessContext context) {
-        // If the query is not set, then an incoming flow file is needed. 
Otherwise fail the initialization
-        if (!context.getProperty(SQL_SELECT_QUERY).isSet() && 
!context.hasIncomingConnection()) {
-            final String errorString = "Either the Select Query must be 
specified or there must be an incoming connection "
-                    + "providing flowfile(s) containing a SQL select query";
-            getLogger().error(errorString);
-            throw new ProcessException(errorString);
-        }
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        FlowFile fileToProcess = null;
-        if (context.hasIncomingConnection()) {
-            fileToProcess = session.get();
-
-            // If we have no FlowFile, and all incoming connections are 
self-loops then we can continue on.
-            // However, if we have no FlowFile and we have connections coming 
from other Processors, then
-            // we know that we should run only if we have a FlowFile.
-            if (fileToProcess == null && context.hasNonLoopConnection()) {
-                return;
-            }
-        }
-
-        final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
-
-        final ComponentLog logger = getLogger();
-        final DBCPService dbcpService = 
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
-        final Integer queryTimeout = 
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+    protected SqlWriter configureSqlWriter(ProcessSession session, 
ProcessContext context, FlowFile fileToProcess) {
         final boolean convertNamesForAvro = 
context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean();
         final Boolean useAvroLogicalTypes = 
context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean();
         final Integer maxRowsPerFlowFile = 
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
-        final Integer outputBatchSizeField = 
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final int outputBatchSize = outputBatchSizeField == null ? 0 : 
outputBatchSizeField;
         final Integer defaultPrecision = 
context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger();
         final Integer defaultScale = 
context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger();
         final String codec = 
context.getProperty(COMPRESSION_FORMAT).getValue();
 
-        final String selectQuery;
-        if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
-            selectQuery = 
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
-        } else {
-            // If the query is not set, then an incoming flow file is 
required, and expected to contain a valid SQL select query.
-            // If there is no incoming connection, onTrigger will not be 
called as the processor will fail when scheduled.
-            final StringBuilder queryContents = new StringBuilder();
-            session.read(fileToProcess, in -> 
queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
-            selectQuery = queryContents.toString();
-        }
-
-        int resultCount=0;
-        try (final Connection con = dbcpService.getConnection(fileToProcess == 
null ? Collections.emptyMap() : fileToProcess.getAttributes());
-            final PreparedStatement st = con.prepareStatement(selectQuery)) {
-            st.setQueryTimeout(queryTimeout); // timeout in seconds
-
-            if (fileToProcess != null) {
-                JdbcCommon.setParameters(st, fileToProcess.getAttributes());
-            }
-            logger.debug("Executing query {}", new Object[]{selectQuery});
-
-            int fragmentIndex=0;
-            final String fragmentId = UUID.randomUUID().toString();
-
-            final StopWatch executionTime = new StopWatch(true);
-
-            boolean hasResults = st.execute();
-
-            long executionTimeElapsed = 
executionTime.getElapsed(TimeUnit.MILLISECONDS);
-
-            boolean hasUpdateCount = st.getUpdateCount() != -1;
-
-            while(hasResults || hasUpdateCount) {
-                //getMoreResults() and execute() return false to indicate that 
the result of the statement is just a number and not a ResultSet
-                if (hasResults) {
-                    final AtomicLong nrOfRows = new AtomicLong(0L);
-
-                    try {
-                        final ResultSet resultSet = st.getResultSet();
-                        final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
-                                .convertNames(convertNamesForAvro)
-                                .useLogicalTypes(useAvroLogicalTypes)
-                                .defaultPrecision(defaultPrecision)
-                                .defaultScale(defaultScale)
-                                .maxRows(maxRowsPerFlowFile)
-                                .codecFactory(codec)
-                                .build();
-
-                        do {
-                            final StopWatch fetchTime = new StopWatch(true);
-
-                            FlowFile resultSetFF;
-                            if (fileToProcess == null) {
-                                resultSetFF = session.create();
-                            } else {
-                                resultSetFF = session.create(fileToProcess);
-                                resultSetFF = 
session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
-                            }
-
-                            try {
-                                resultSetFF = session.write(resultSetFF, out 
-> {
-                                    try {
-                                        
nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null));
-                                    } catch (SQLException e) {
-                                        throw new ProcessException(e);
-                                    }
-                                });
-
-                                long fetchTimeElapsed = 
fetchTime.getElapsed(TimeUnit.MILLISECONDS);
-
-                                // set attribute how many rows were selected
-                                resultSetFF = 
session.putAttribute(resultSetFF, RESULT_ROW_COUNT, 
String.valueOf(nrOfRows.get()));
-                                resultSetFF = 
session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, 
String.valueOf(executionTimeElapsed + fetchTimeElapsed));
-                                resultSetFF = 
session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, 
String.valueOf(executionTimeElapsed));
-                                resultSetFF = 
session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, 
String.valueOf(fetchTimeElapsed));
-                                resultSetFF = 
session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), 
JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                                resultSetFF = 
session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount));
-
-                                // if fragmented ResultSet, determine if we 
should keep this fragment; set fragment attributes
-                                if (maxRowsPerFlowFile > 0) {
-                                    // if row count is zero and this is not 
the first fragment, drop it instead of committing it.
-                                    if (nrOfRows.get() == 0 && fragmentIndex > 
0) {
-                                        session.remove(resultSetFF);
-                                        break;
-                                    }
-
-                                    resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
-                                    resultSetFF = 
session.putAttribute(resultSetFF, FRAGMENT_INDEX, 
String.valueOf(fragmentIndex));
-                                }
-
-                                logger.info("{} contains {} Avro records; 
transferring to 'success'",
-                                        new Object[]{resultSetFF, 
nrOfRows.get()});
-                                
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + 
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
-                                resultSetFlowFiles.add(resultSetFF);
-
-                                // If we've reached the batch size, send out 
the flow files
-                                if (outputBatchSize > 0 && 
resultSetFlowFiles.size() >= outputBatchSize) {
-                                    session.transfer(resultSetFlowFiles, 
REL_SUCCESS);
-                                    session.commit();
-                                    resultSetFlowFiles.clear();
-                                }
-
-                                fragmentIndex++;
-                            } catch (Exception e) {
-                                // Remove the result set flow file and 
propagate the exception
-                                session.remove(resultSetFF);
-                                if (e instanceof ProcessException) {
-                                    throw (ProcessException) e;
-                                } else {
-                                    throw new ProcessException(e);
-                                }
-                            }
-                        } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == 
maxRowsPerFlowFile);
-
-                        // If we are splitting results but not outputting 
batches, set count on all FlowFiles
-                        if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) {
-                            for (int i = 0; i < resultSetFlowFiles.size(); 
i++) {
-                                resultSetFlowFiles.set(i,
-                                        
session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, 
Integer.toString(fragmentIndex)));
-                            }
-                        }
-                    } catch (final SQLException e) {
-                        throw new ProcessException(e);
-                    }
-
-                    resultCount++;
-                }
-
-                // are there anymore result sets?
-                try{
-                    hasResults = 
st.getMoreResults(Statement.CLOSE_CURRENT_RESULT);
-                    hasUpdateCount = st.getUpdateCount() != -1;
-                } catch(SQLException ex){
-                    hasResults = false;
-                    hasUpdateCount = false;
-                }
-            }
-
-            // Transfer any remaining files to SUCCESS
-            session.transfer(resultSetFlowFiles, REL_SUCCESS);
-            resultSetFlowFiles.clear();
-
-            //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
-            //  pass the original flow file down the line to trigger 
downstream processors
-            if(fileToProcess != null){
-                if(resultCount > 0){
-                    session.remove(fileToProcess);
-                } else {
-                    fileToProcess = session.write(fileToProcess, 
JdbcCommon::createEmptyAvroStream);
-
-                    fileToProcess = session.putAttribute(fileToProcess, 
RESULT_ROW_COUNT, "0");
-                    fileToProcess = session.putAttribute(fileToProcess, 
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                    session.transfer(fileToProcess, REL_SUCCESS);
-                }
-            } else if(resultCount == 0){
-                //If we had no inbound FlowFile, no exceptions, and the SQL 
generated no result sets (Insert/Update/Delete statements only)
-                // Then generate an empty Output FlowFile
-                FlowFile resultSetFF = session.create();
-
-                resultSetFF = session.write(resultSetFF, out -> 
JdbcCommon.createEmptyAvroStream(out));
-
-                resultSetFF = session.putAttribute(resultSetFF, 
RESULT_ROW_COUNT, "0");
-                resultSetFF = session.putAttribute(resultSetFF, 
CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY);
-                session.transfer(resultSetFF, REL_SUCCESS);
-            }
-        } catch (final ProcessException | SQLException e) {
-            //If we had at least one result then it's OK to drop the original 
file, but if we had no results then
-            //  pass the original flow file down the line to trigger 
downstream processors
-            if (fileToProcess == null) {
-                // This can happen if any exceptions occur while setting up 
the connection, statement, etc.
-                logger.error("Unable to execute SQL select query {} due to {}. 
No FlowFile to route to failure",
-                        new Object[]{selectQuery, e});
-                context.yield();
-            } else {
-                if (context.hasIncomingConnection()) {
-                    logger.error("Unable to execute SQL select query {} for {} 
due to {}; routing to failure",
-                            new Object[]{selectQuery, fileToProcess, e});
-                    fileToProcess = session.penalize(fileToProcess);
-                } else {
-                    logger.error("Unable to execute SQL select query {} due to 
{}; routing to failure",
-                            new Object[]{selectQuery, e});
-                    context.yield();
-                }
-                session.transfer(fileToProcess, REL_FAILURE);
-            }
-        }
+        final JdbcCommon.AvroConversionOptions options = 
JdbcCommon.AvroConversionOptions.builder()
+                .convertNames(convertNamesForAvro)
+                .useLogicalTypes(useAvroLogicalTypes)
+                .defaultPrecision(defaultPrecision)
+                .defaultScale(defaultScale)
+                .maxRows(maxRowsPerFlowFile)
+                .codecFactory(codec)
+                .build();
+        return new DefaultAvroSqlWriter(options);
     }
 }

Reply via email to