NIFI-4517: Added ExecuteSQLRecord and QueryDatabaseTableRecord processors Signed-off-by: Pierre Villard <[email protected]>
This closes #2945. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c6572f04 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c6572f04 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c6572f04 Branch: refs/heads/master Commit: c6572f042bf1637f6faaa2b2ffe4a56e297c6d1a Parents: b4810b8 Author: Matthew Burgess <[email protected]> Authored: Fri Aug 10 16:49:25 2018 -0400 Committer: Pierre Villard <[email protected]> Committed: Sat Oct 6 10:53:11 2018 +0200 ---------------------------------------------------------------------- .../record/ResultSetRecordSet.java | 15 +- .../processors/standard/AbstractExecuteSQL.java | 369 +++++ .../standard/AbstractQueryDatabaseTable.java | 483 +++++++ .../nifi/processors/standard/ExecuteSQL.java | 371 +---- .../processors/standard/ExecuteSQLRecord.java | 147 ++ .../processors/standard/QueryDatabaseTable.java | 453 +----- .../standard/QueryDatabaseTableRecord.java | 148 ++ .../standard/sql/DefaultAvroSqlWriter.java | 67 + .../standard/sql/RecordSqlWriter.java | 158 +++ .../nifi/processors/standard/sql/SqlWriter.java | 77 + .../org.apache.nifi.processor.Processor | 2 + .../standard/QueryDatabaseTableRecordTest.java | 1332 ++++++++++++++++++ .../standard/QueryDatabaseTableTest.java | 2 +- .../processors/standard/TestExecuteSQL.java | 44 +- .../standard/TestExecuteSQLRecord.java | 376 +++++ 15 files changed, 3261 insertions(+), 783 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index 551789c..bf7d224 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -63,6 +63,19 @@ public class ResultSetRecordSet implements RecordSet, Closeable { return schema; } + // Protected methods for subclasses to access private member variables + protected ResultSet getResultSet() { + return rs; + } + + protected boolean hasMoreRows() { + return moreRows; + } + + protected void setMoreRows(boolean moreRows) { + this.moreRows = moreRows; + } + @Override public Record next() throws IOException { try { @@ -87,7 +100,7 @@ public class ResultSetRecordSet implements RecordSet, Closeable { } } - private Record createRecord(final ResultSet rs) throws SQLException { + protected Record createRecord(final ResultSet rs) throws SQLException { final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); for (final RecordField field : schema.getFields()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java new file mode 100644 index 0000000..bf46549 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.sql.SqlWriter; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.StopWatch; + +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + + +public abstract class AbstractExecuteSQL extends AbstractProcessor { + + public static final String RESULT_ROW_COUNT = "executesql.row.count"; + public static final String RESULT_QUERY_DURATION = "executesql.query.duration"; + public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime"; + public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime"; + public static final String RESULTSET_INDEX = "executesql.resultset.index"; + + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully created FlowFile from SQL query result set.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") + .build(); + protected Set<Relationship> relationships; + + public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() + .name("Database Connection Pooling Service") + .description("The Controller Service that is used to obtain connection to database") + .required(true) + .identifiesControllerService(DBCPService.class) + .build(); + + public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() + .name("SQL select query") + .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes " + + "using Expression Language. If this property is specified, it will be used regardless of the content of " + + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " + + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression " + + "Language is not evaluated for flow file contents.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Max Wait Time") + .description("The maximum amount of time allowed for a running SQL select query " + + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") + .defaultValue("0 seconds") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("esql-max-rows") + .displayName("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " + + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("esql-output-batch-size") + .displayName("Output Batch Size") + .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " + + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " + + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " + + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this " + + "property is set.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + protected List<PropertyDescriptor> propDescriptors; + + protected DBCPService dbcpService; + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @OnScheduled + public void setup(ProcessContext context) { + // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization + if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) { + final String errorString = "Either the Select Query must be specified or there must be an incoming connection " + + "providing flowfile(s) containing a SQL select query"; + getLogger().error(errorString); + throw new ProcessException(errorString); + } + dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile fileToProcess = null; + if (context.hasIncomingConnection()) { + fileToProcess = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (fileToProcess == null && context.hasNonLoopConnection()) { + return; + } + } + + final List<FlowFile> resultSetFlowFiles = new ArrayList<>(); + + final ComponentLog logger = getLogger(); + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField; + + SqlWriter sqlWriter = configureSqlWriter(session, context, fileToProcess); + + final String selectQuery; + if (context.getProperty(SQL_SELECT_QUERY).isSet()) { + selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); + } else { + // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. + // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. + final StringBuilder queryContents = new StringBuilder(); + session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset()))); + selectQuery = queryContents.toString(); + } + + int resultCount = 0; + try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); + final PreparedStatement st = con.prepareStatement(selectQuery)) { + st.setQueryTimeout(queryTimeout); // timeout in seconds + + if (fileToProcess != null) { + JdbcCommon.setParameters(st, fileToProcess.getAttributes()); + } + logger.debug("Executing query {}", new Object[]{selectQuery}); + + int fragmentIndex = 0; + final String fragmentId = UUID.randomUUID().toString(); + + final StopWatch executionTime = new StopWatch(true); + + boolean hasResults = st.execute(); + + long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS); + + boolean hasUpdateCount = st.getUpdateCount() != -1; + + while (hasResults || hasUpdateCount) { + //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet + if (hasResults) { + final AtomicLong nrOfRows = new AtomicLong(0L); + + try { + final ResultSet resultSet = st.getResultSet(); + do { + final StopWatch fetchTime = new StopWatch(true); + + FlowFile resultSetFF; + if (fileToProcess == null) { + resultSetFF = session.create(); + } else { + resultSetFF = session.create(fileToProcess); + resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); + } + + try { + resultSetFF = session.write(resultSetFF, out -> { + try { + nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null)); + } catch (Exception e) { + throw (e instanceof ProcessException) ? (ProcessException) e : new ProcessException(e); + } + }); + + long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); + + // set attributes + final Map<String, String> attributesToAdd = new HashMap<>(); + attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + attributesToAdd.put(RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); + attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); + attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); + attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount)); + attributesToAdd.putAll(sqlWriter.getAttributesToAdd()); + resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd); + sqlWriter.updateCounters(session); + + // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes + if (maxRowsPerFlowFile > 0) { + // if row count is zero and this is not the first fragment, drop it instead of committing it. + if (nrOfRows.get() == 0 && fragmentIndex > 0) { + session.remove(resultSetFF); + break; + } + + resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId); + resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); + } + + logger.info("{} contains {} records; transferring to 'success'", + new Object[]{resultSetFF, nrOfRows.get()}); + // Report a FETCH event if there was an incoming flow file, or a RECEIVE event otherwise + if(context.hasIncomingConnection()) { + session.getProvenanceReporter().fetch(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); + } else { + session.getProvenanceReporter().receive(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); + } + resultSetFlowFiles.add(resultSetFF); + + // If we've reached the batch size, send out the flow files + if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { + session.transfer(resultSetFlowFiles, REL_SUCCESS); + session.commit(); + resultSetFlowFiles.clear(); + } + + fragmentIndex++; + } catch (Exception e) { + // Remove the result set flow file and propagate the exception + session.remove(resultSetFF); + if (e instanceof ProcessException) { + throw (ProcessException) e; + } else { + throw new ProcessException(e); + } + } + } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile); + + // If we are splitting results but not outputting batches, set count on all FlowFiles + if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex))); + } + } + } catch (final SQLException e) { + throw new ProcessException(e); + } + + resultCount++; + } + + // are there anymore result sets? + try { + hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT); + hasUpdateCount = st.getUpdateCount() != -1; + } catch (SQLException ex) { + hasResults = false; + hasUpdateCount = false; + } + } + + // Transfer any remaining files to SUCCESS + session.transfer(resultSetFlowFiles, REL_SUCCESS); + resultSetFlowFiles.clear(); + + //If we had at least one result then it's OK to drop the original file, but if we had no results then + // pass the original flow file down the line to trigger downstream processors + if (fileToProcess != null) { + if (resultCount > 0) { + session.remove(fileToProcess); + } else { + fileToProcess = session.write(fileToProcess, out -> sqlWriter.writeEmptyResultSet(out, getLogger())); + fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0"); + fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType()); + session.transfer(fileToProcess, REL_SUCCESS); + } + } else if (resultCount == 0) { + //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) + // Then generate an empty Output FlowFile + FlowFile resultSetFF = session.create(); + + resultSetFF = session.write(resultSetFF, out -> sqlWriter.writeEmptyResultSet(out, getLogger())); + resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0"); + resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), sqlWriter.getMimeType()); + session.transfer(resultSetFF, REL_SUCCESS); + } + } catch (final ProcessException | SQLException e) { + //If we had at least one result then it's OK to drop the original file, but if we had no results then + // pass the original flow file down the line to trigger downstream processors + if (fileToProcess == null) { + // This can happen if any exceptions occur while setting up the connection, statement, etc. + logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", + new Object[]{selectQuery, e}); + context.yield(); + } else { + if (context.hasIncomingConnection()) { + logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", + new Object[]{selectQuery, fileToProcess, e}); + fileToProcess = session.penalize(fileToProcess); + } else { + logger.error("Unable to execute SQL select query {} due to {}; routing to failure", + new Object[]{selectQuery, e}); + context.yield(); + } + session.transfer(fileToProcess, REL_FAILURE); + } + } + } + + protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java new file mode 100644 index 0000000..06df6c1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractQueryDatabaseTable.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; +import org.apache.nifi.processors.standard.sql.SqlWriter; +import org.apache.nifi.processors.standard.util.JdbcCommon; +import org.apache.nifi.util.StopWatch; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + + +public abstract class AbstractQueryDatabaseTable extends AbstractDatabaseFetchProcessor { + + public static final String RESULT_TABLENAME = "tablename"; + public static final String RESULT_ROW_COUNT = "querydbtable.row.count"; + + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + + public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder() + .name("Fetch Size") + .description("The number of result rows to be fetched from the result set at a time. This is a hint to the database driver and may not be " + + "honored and/or exact. If the value specified is zero, then the hint is ignored.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() + .name("qdbt-max-rows") + .displayName("Max Rows Per Flow File") + .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " + + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() + .name("qdbt-output-batch-size") + .displayName("Output Batch Size") + .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " + + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " + + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " + + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The maxvalue.* and fragment.count attributes will not be set on FlowFiles when this " + + "property is set.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + public static final PropertyDescriptor MAX_FRAGMENTS = new PropertyDescriptor.Builder() + .name("qdbt-max-frags") + .displayName("Maximum Number of Fragments") + .description("The maximum number of fragments. If the value specified is zero, then all fragments are returned. " + + "This prevents OutOfMemoryError when this processor ingests huge table. NOTE: Setting this property can result in data loss, as the incoming results are " + + "not ordered, and fragments may end at arbitrary boundaries where rows are not included in the result set.") + .defaultValue("0") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .build(); + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dynamic(true) + .build(); + } + + @OnScheduled + public void setup(final ProcessContext context) { + maxValueProperties = getDefaultMaxValueProperties(context, null); + } + + @OnStopped + public void stop() { + // Reset the column type map in case properties change + setupComplete.set(false); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + // Fetch the column/table info once + if (!setupComplete.get()) { + super.setup(context); + } + ProcessSession session = sessionFactory.createSession(); + final List<FlowFile> resultSetFlowFiles = new ArrayList<>(); + + final ComponentLog logger = getLogger(); + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final String sqlQuery = context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).evaluateAttributeExpressions().getValue(); + final String customWhereClause = context.getProperty(WHERE_CLAUSE).evaluateAttributeExpressions().getValue(); + final Integer fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger(); + final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); + final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField; + final Integer maxFragments = context.getProperty(MAX_FRAGMENTS).isSet() + ? context.getProperty(MAX_FRAGMENTS).evaluateAttributeExpressions().asInteger() + : 0; + + + SqlWriter sqlWriter = configureSqlWriter(session, context); + + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + + try { + stateMap = stateManager.getState(Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform " + + "query until this is accomplished.", ioe); + context.yield(); + return; + } + // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually + // set as the current state map (after the session has been committed) + final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap()); + + //If an initial max value for column(s) has been specified using properties, and this column is not in the state manager, sync them to the state property map + for (final Map.Entry<String, String> maxProp : maxValueProperties.entrySet()) { + String maxPropKey = maxProp.getKey().toLowerCase(); + String fullyQualifiedMaxPropKey = getStateKey(tableName, maxPropKey, dbAdapter); + if (!statePropertyMap.containsKey(fullyQualifiedMaxPropKey)) { + String newMaxPropValue; + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name, + // but store the new initial max value under the fully-qualified key. + if (statePropertyMap.containsKey(maxPropKey)) { + newMaxPropValue = statePropertyMap.get(maxPropKey); + } else { + newMaxPropValue = maxProp.getValue(); + } + statePropertyMap.put(fullyQualifiedMaxPropKey, newMaxPropValue); + + } + } + + List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) + ? null + : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); + final String selectQuery = getQuery(dbAdapter, tableName, sqlQuery, columnNames, maxValueColumnNameList, customWhereClause, statePropertyMap); + final StopWatch stopWatch = new StopWatch(true); + final String fragmentIdentifier = UUID.randomUUID().toString(); + + try (final Connection con = dbcpService.getConnection(Collections.emptyMap()); + final Statement st = con.createStatement()) { + + if (fetchSize != null && fetchSize > 0) { + try { + st.setFetchSize(fetchSize); + } catch (SQLException se) { + // Not all drivers support this, just log the error (at debug level) and move on + logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se); + } + } + + String jdbcURL = "DBCPService"; + try { + DatabaseMetaData databaseMetaData = con.getMetaData(); + if (databaseMetaData != null) { + jdbcURL = databaseMetaData.getURL(); + } + } catch (SQLException se) { + // Ignore and use default JDBC URL. This shouldn't happen unless the driver doesn't implement getMetaData() properly + } + + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.SECONDS).intValue(); + st.setQueryTimeout(queryTimeout); // timeout in seconds + if (logger.isDebugEnabled()) { + logger.debug("Executing query {}", new Object[] { selectQuery }); + } + try (final ResultSet resultSet = st.executeQuery(selectQuery)) { + int fragmentIndex=0; + // Max values will be updated in the state property map by the callback + final MaxValueResultSetRowCollector maxValCollector = new MaxValueResultSetRowCollector(tableName, statePropertyMap, dbAdapter); + + while(true) { + final AtomicLong nrOfRows = new AtomicLong(0L); + + FlowFile fileToProcess = session.create(); + try { + fileToProcess = session.write(fileToProcess, out -> { + try { + nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), maxValCollector)); + } catch (Exception e) { + throw new ProcessException("Error during database query or conversion of records.", e); + } + }); + } catch (ProcessException e) { + // Add flowfile to results before rethrowing so it will be removed from session in outer catch + resultSetFlowFiles.add(fileToProcess); + throw e; + } + + if (nrOfRows.get() > 0) { + // set attributes + final Map<String, String> attributesToAdd = new HashMap<>(); + attributesToAdd.put(RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + attributesToAdd.put(RESULT_TABLENAME, tableName); + + if(maxRowsPerFlowFile > 0) { + attributesToAdd.put(FRAGMENT_ID, fragmentIdentifier); + attributesToAdd.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex)); + } + + attributesToAdd.putAll(sqlWriter.getAttributesToAdd()); + fileToProcess = session.putAllAttributes(fileToProcess, attributesToAdd); + sqlWriter.updateCounters(session); + + logger.info("{} contains {} records; transferring to 'success'", + new Object[]{fileToProcess, nrOfRows.get()}); + + session.getProvenanceReporter().receive(fileToProcess, jdbcURL, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + resultSetFlowFiles.add(fileToProcess); + // If we've reached the batch size, send out the flow files + if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { + session.transfer(resultSetFlowFiles, REL_SUCCESS); + session.commit(); + resultSetFlowFiles.clear(); + } + } else { + // If there were no rows returned, don't send the flowfile + session.remove(fileToProcess); + // If no rows and this was first FlowFile, yield + if(fragmentIndex == 0){ + context.yield(); + } + break; + } + + fragmentIndex++; + if (maxFragments > 0 && fragmentIndex >= maxFragments) { + break; + } + + // If we aren't splitting up the data into flow files or fragments, then the result set has been entirely fetched so don't loop back around + if (maxFragments == 0 && maxRowsPerFlowFile == 0) { + break; + } + + // If we are splitting up the data into flow files, don't loop back around if we've gotten all results + if(maxRowsPerFlowFile > 0 && nrOfRows.get() < maxRowsPerFlowFile) { + break; + } + } + + // Apply state changes from the Max Value tracker + maxValCollector.applyStateChanges(); + + // Even though the maximum value and total count are known at this point, to maintain consistent behavior if Output Batch Size is set, do not store the attributes + if (outputBatchSize == 0) { + for (int i = 0; i < resultSetFlowFiles.size(); i++) { + // Add maximum values as attributes + for (Map.Entry<String, String> entry : statePropertyMap.entrySet()) { + // Get just the column name from the key + String key = entry.getKey(); + String colName = key.substring(key.lastIndexOf(NAMESPACE_DELIMITER) + NAMESPACE_DELIMITER.length()); + resultSetFlowFiles.set(i, session.putAttribute(resultSetFlowFiles.get(i), "maxvalue." + colName, entry.getValue())); + } + + //set count on all FlowFiles + if (maxRowsPerFlowFile > 0) { + resultSetFlowFiles.set(i, + session.putAttribute(resultSetFlowFiles.get(i), "fragment.count", Integer.toString(fragmentIndex))); + } + } + } + } catch (final SQLException e) { + throw e; + } + + session.transfer(resultSetFlowFiles, REL_SUCCESS); + + } catch (final ProcessException | SQLException e) { + logger.error("Unable to execute SQL select query {} due to {}", new Object[]{selectQuery, e}); + if (!resultSetFlowFiles.isEmpty()) { + session.remove(resultSetFlowFiles); + } + context.yield(); + } finally { + session.commit(); + try { + // Update the state + stateManager.setState(statePropertyMap, Scope.CLUSTER); + } catch (IOException ioe) { + getLogger().error("{} failed to update State Manager, maximum observed values will not be recorded", new Object[]{this, ioe}); + } + } + } + + protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String columnNames, List<String> maxValColumnNames, + String customWhereClause, Map<String, String> stateMap) { + + return getQuery(dbAdapter, tableName, null, columnNames, maxValColumnNames, customWhereClause, stateMap); + } + + protected String getQuery(DatabaseAdapter dbAdapter, String tableName, String sqlQuery, String columnNames, List<String> maxValColumnNames, + String customWhereClause, Map<String, String> stateMap) { + if (StringUtils.isEmpty(tableName)) { + throw new IllegalArgumentException("Table name must be specified"); + } + final StringBuilder query; + + if (StringUtils.isEmpty(sqlQuery)) { + query = new StringBuilder(dbAdapter.getSelectStatement(tableName, columnNames, null, null, null, null)); + } else { + query = getWrappedQuery(dbAdapter, sqlQuery, tableName); + } + + List<String> whereClauses = new ArrayList<>(); + // Check state map for last max values + if (stateMap != null && !stateMap.isEmpty() && maxValColumnNames != null) { + IntStream.range(0, maxValColumnNames.size()).forEach((index) -> { + String colName = maxValColumnNames.get(index); + String maxValueKey = getStateKey(tableName, colName, dbAdapter); + String maxValue = stateMap.get(maxValueKey); + if (StringUtils.isEmpty(maxValue)) { + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + maxValue = stateMap.get(colName.toLowerCase()); + } + if (!StringUtils.isEmpty(maxValue)) { + Integer type = columnTypeMap.get(maxValueKey); + if (type == null) { + // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. + throw new IllegalArgumentException("No column type found for: " + colName); + } + // Add a condition for the WHERE clause + whereClauses.add(colName + (index == 0 ? " > " : " >= ") + getLiteralByType(type, maxValue, dbAdapter.getName())); + } + }); + } + + if (customWhereClause != null) { + whereClauses.add("(" + customWhereClause + ")"); + } + + if (!whereClauses.isEmpty()) { + query.append(" WHERE "); + query.append(StringUtils.join(whereClauses, " AND ")); + } + + return query.toString(); + } + + public class MaxValueResultSetRowCollector implements JdbcCommon.ResultSetRowCallback { + DatabaseAdapter dbAdapter; + final Map<String, String> newColMap; + final Map<String, String> originalState; + String tableName; + + public MaxValueResultSetRowCollector(String tableName, Map<String, String> stateMap, DatabaseAdapter dbAdapter) { + this.dbAdapter = dbAdapter; + this.originalState = stateMap; + + this.newColMap = new HashMap<>(); + this.newColMap.putAll(stateMap); + + this.tableName = tableName; + } + + @Override + public void processRow(ResultSet resultSet) throws IOException { + if (resultSet == null) { + return; + } + try { + // Iterate over the row, check-and-set max values + final ResultSetMetaData meta = resultSet.getMetaData(); + final int nrOfColumns = meta.getColumnCount(); + if (nrOfColumns > 0) { + for (int i = 1; i <= nrOfColumns; i++) { + String colName = meta.getColumnName(i).toLowerCase(); + String fullyQualifiedMaxValueKey = getStateKey(tableName, colName, dbAdapter); + Integer type = columnTypeMap.get(fullyQualifiedMaxValueKey); + // Skip any columns we're not keeping track of or whose value is null + if (type == null || resultSet.getObject(i) == null) { + continue; + } + String maxValueString = newColMap.get(fullyQualifiedMaxValueKey); + // If we can't find the value at the fully-qualified key name, it is possible (under a previous scheme) + // the value has been stored under a key that is only the column name. Fall back to check the column name; either way, when a new + // maximum value is observed, it will be stored under the fully-qualified key from then on. + if (StringUtils.isEmpty(maxValueString)) { + maxValueString = newColMap.get(colName); + } + String newMaxValueString = getMaxValueFromRow(resultSet, i, type, maxValueString, dbAdapter.getName()); + if (newMaxValueString != null) { + newColMap.put(fullyQualifiedMaxValueKey, newMaxValueString); + } + } + } + } catch (ParseException | SQLException e) { + throw new IOException(e); + } + } + + @Override + public void applyStateChanges() { + this.originalState.putAll(this.newColMap); + } + } + + protected abstract SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/c6572f04/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index df82e2e..cc6d508 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -16,22 +16,12 @@ */ package org.apache.nifi.processors.standard; -import java.nio.charset.Charset; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -41,23 +31,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.flowfile.attributes.FragmentAttributes; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.standard.util.AvroUtil.CodecType; +import org.apache.nifi.processors.standard.sql.DefaultAvroSqlWriter; +import org.apache.nifi.processors.standard.sql.SqlWriter; import org.apache.nifi.processors.standard.util.JdbcCommon; -import org.apache.nifi.util.StopWatch; +import org.apache.nifi.processors.standard.util.AvroUtil.CodecType; import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_PRECISION; import static org.apache.nifi.processors.standard.util.JdbcCommon.DEFAULT_SCALE; @@ -94,99 +77,24 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC + "'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.") }) @WritesAttributes({ - @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"), - @WritesAttribute(attribute="executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"), - @WritesAttribute(attribute="executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"), - @WritesAttribute(attribute="executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), - @WritesAttribute(attribute="executesql.resultset.index", description = "Assuming multiple result sets are returned, " - + "the zero based index of this result set."), - @WritesAttribute(attribute="fragment.identifier", description="If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " - + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), - @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " - + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " - + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " - + "attribute will not be populated."), - @WritesAttribute(attribute="fragment.index", description="If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " - + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " - + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + @WritesAttribute(attribute = "executesql.row.count", description = "Contains the number of rows returned in the select query"), + @WritesAttribute(attribute = "executesql.query.duration", description = "Combined duration of the query execution time and fetch time in milliseconds"), + @WritesAttribute(attribute = "executesql.query.executiontime", description = "Duration of the query execution time in milliseconds"), + @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), + @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + + "the zero based index of this result set."), + @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), + @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " + + "FlowFiles produced by a single ResultSet. This can be used in conjunction with the " + + "fragment.identifier attribute in order to know how many FlowFiles belonged to the same incoming ResultSet. If Output Batch Size is set, then this " + + "attribute will not be populated."), + @WritesAttribute(attribute = "fragment.index", description = "If 'Max Rows Per Flow File' is set then the position of this FlowFile in the list of " + + "outgoing FlowFiles that were all derived from the same result set FlowFile. This can be " + + "used in conjunction with the fragment.identifier attribute to know which FlowFiles originated from the same query result set and in what order " + "FlowFiles were produced") }) -public class ExecuteSQL extends AbstractProcessor { - - public static final String RESULT_ROW_COUNT = "executesql.row.count"; - public static final String RESULT_QUERY_DURATION = "executesql.query.duration"; - public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime"; - public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime"; - public static final String RESULTSET_INDEX = "executesql.resultset.index"; - - public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); - public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); - public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); - - // Relationships - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Successfully created FlowFile from SQL query result set.") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("SQL query execution failed. Incoming FlowFile will be penalized and routed to this relationship") - .build(); - private final Set<Relationship> relationships; - - public static final PropertyDescriptor DBCP_SERVICE = new PropertyDescriptor.Builder() - .name("Database Connection Pooling Service") - .description("The Controller Service that is used to obtain connection to database") - .required(true) - .identifiesControllerService(DBCPService.class) - .build(); - - public static final PropertyDescriptor SQL_SELECT_QUERY = new PropertyDescriptor.Builder() - .name("SQL select query") - .description("The SQL select query to execute. The query can be empty, a constant value, or built from attributes " - + "using Expression Language. If this property is specified, it will be used regardless of the content of " - + "incoming flowfiles. If this property is empty, the content of the incoming flow file is expected " - + "to contain a valid SQL select query, to be issued by the processor to the database. Note that Expression " - + "Language is not evaluated for flow file contents.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder() - .name("Max Wait Time") - .description("The maximum amount of time allowed for a running SQL select query " - + " , zero means there is no limit. Max time less than 1 second will be equal to zero.") - .defaultValue("0 seconds") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .sensitive(false) - .build(); - - public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new PropertyDescriptor.Builder() - .name("esql-max-rows") - .displayName("Max Rows Per Flow File") - .description("The maximum number of result rows that will be included in a single FlowFile. This will allow you to break up very large " - + "result sets into multiple FlowFiles. If the value specified is zero, then all rows are returned in a single FlowFile.") - .defaultValue("0") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); - - public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new PropertyDescriptor.Builder() - .name("esql-output-batch-size") - .displayName("Output Batch Size") - .description("The number of output FlowFiles to queue before committing the process session. When set to zero, the session will be committed when all result set rows " - + "have been processed and the output FlowFiles are ready for transfer to the downstream relationship. For large result sets, this can cause a large burst of FlowFiles " - + "to be transferred at the end of processor execution. If this property is set, then when the specified number of FlowFiles are ready for transfer, then the session will " - + "be committed, thus releasing the FlowFiles to the downstream relationship. NOTE: The fragment.count attribute will not be set on FlowFiles when this " - + "property is set.") - .defaultValue("0") - .required(true) - .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) - .build(); +public class ExecuteSQL extends AbstractExecuteSQL { public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() .name("compression-format") @@ -198,8 +106,6 @@ public class ExecuteSQL extends AbstractProcessor { .required(true) .build(); - private final List<PropertyDescriptor> propDescriptors; - public ExecuteSQL() { final Set<Relationship> r = new HashSet<>(); r.add(REL_SUCCESS); @@ -212,248 +118,31 @@ public class ExecuteSQL extends AbstractProcessor { pds.add(QUERY_TIMEOUT); pds.add(NORMALIZE_NAMES_FOR_AVRO); pds.add(USE_AVRO_LOGICAL_TYPES); + pds.add(COMPRESSION_FORMAT); pds.add(DEFAULT_PRECISION); pds.add(DEFAULT_SCALE); pds.add(MAX_ROWS_PER_FLOW_FILE); pds.add(OUTPUT_BATCH_SIZE); - pds.add(COMPRESSION_FORMAT); propDescriptors = Collections.unmodifiableList(pds); } @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propDescriptors; - } - - @OnScheduled - public void setup(ProcessContext context) { - // If the query is not set, then an incoming flow file is needed. Otherwise fail the initialization - if (!context.getProperty(SQL_SELECT_QUERY).isSet() && !context.hasIncomingConnection()) { - final String errorString = "Either the Select Query must be specified or there must be an incoming connection " - + "providing flowfile(s) containing a SQL select query"; - getLogger().error(errorString); - throw new ProcessException(errorString); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile fileToProcess = null; - if (context.hasIncomingConnection()) { - fileToProcess = session.get(); - - // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. - // However, if we have no FlowFile and we have connections coming from other Processors, then - // we know that we should run only if we have a FlowFile. - if (fileToProcess == null && context.hasNonLoopConnection()) { - return; - } - } - - final List<FlowFile> resultSetFlowFiles = new ArrayList<>(); - - final ComponentLog logger = getLogger(); - final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); - final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + protected SqlWriter configureSqlWriter(ProcessSession session, ProcessContext context, FlowFile fileToProcess) { final boolean convertNamesForAvro = context.getProperty(NORMALIZE_NAMES_FOR_AVRO).asBoolean(); final Boolean useAvroLogicalTypes = context.getProperty(USE_AVRO_LOGICAL_TYPES).asBoolean(); final Integer maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); - final Integer outputBatchSizeField = context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - final int outputBatchSize = outputBatchSizeField == null ? 0 : outputBatchSizeField; final Integer defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(fileToProcess).asInteger(); final Integer defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(fileToProcess).asInteger(); final String codec = context.getProperty(COMPRESSION_FORMAT).getValue(); - final String selectQuery; - if (context.getProperty(SQL_SELECT_QUERY).isSet()) { - selectQuery = context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); - } else { - // If the query is not set, then an incoming flow file is required, and expected to contain a valid SQL select query. - // If there is no incoming connection, onTrigger will not be called as the processor will fail when scheduled. - final StringBuilder queryContents = new StringBuilder(); - session.read(fileToProcess, in -> queryContents.append(IOUtils.toString(in, Charset.defaultCharset()))); - selectQuery = queryContents.toString(); - } - - int resultCount=0; - try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes()); - final PreparedStatement st = con.prepareStatement(selectQuery)) { - st.setQueryTimeout(queryTimeout); // timeout in seconds - - if (fileToProcess != null) { - JdbcCommon.setParameters(st, fileToProcess.getAttributes()); - } - logger.debug("Executing query {}", new Object[]{selectQuery}); - - int fragmentIndex=0; - final String fragmentId = UUID.randomUUID().toString(); - - final StopWatch executionTime = new StopWatch(true); - - boolean hasResults = st.execute(); - - long executionTimeElapsed = executionTime.getElapsed(TimeUnit.MILLISECONDS); - - boolean hasUpdateCount = st.getUpdateCount() != -1; - - while(hasResults || hasUpdateCount) { - //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet - if (hasResults) { - final AtomicLong nrOfRows = new AtomicLong(0L); - - try { - final ResultSet resultSet = st.getResultSet(); - final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() - .convertNames(convertNamesForAvro) - .useLogicalTypes(useAvroLogicalTypes) - .defaultPrecision(defaultPrecision) - .defaultScale(defaultScale) - .maxRows(maxRowsPerFlowFile) - .codecFactory(codec) - .build(); - - do { - final StopWatch fetchTime = new StopWatch(true); - - FlowFile resultSetFF; - if (fileToProcess == null) { - resultSetFF = session.create(); - } else { - resultSetFF = session.create(fileToProcess); - resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); - } - - try { - resultSetFF = session.write(resultSetFF, out -> { - try { - nrOfRows.set(JdbcCommon.convertToAvroStream(resultSet, out, options, null)); - } catch (SQLException e) { - throw new ProcessException(e); - } - }); - - long fetchTimeElapsed = fetchTime.getElapsed(TimeUnit.MILLISECONDS); - - // set attribute how many rows were selected - resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); - resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_DURATION, String.valueOf(executionTimeElapsed + fetchTimeElapsed)); - resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); - resultSetFF = session.putAttribute(resultSetFF, RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); - resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); - resultSetFF = session.putAttribute(resultSetFF, RESULTSET_INDEX, String.valueOf(resultCount)); - - // if fragmented ResultSet, determine if we should keep this fragment; set fragment attributes - if (maxRowsPerFlowFile > 0) { - // if row count is zero and this is not the first fragment, drop it instead of committing it. - if (nrOfRows.get() == 0 && fragmentIndex > 0) { - session.remove(resultSetFF); - break; - } - - resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId); - resultSetFF = session.putAttribute(resultSetFF, FRAGMENT_INDEX, String.valueOf(fragmentIndex)); - } - - logger.info("{} contains {} Avro records; transferring to 'success'", - new Object[]{resultSetFF, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " + nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed); - resultSetFlowFiles.add(resultSetFF); - - // If we've reached the batch size, send out the flow files - if (outputBatchSize > 0 && resultSetFlowFiles.size() >= outputBatchSize) { - session.transfer(resultSetFlowFiles, REL_SUCCESS); - session.commit(); - resultSetFlowFiles.clear(); - } - - fragmentIndex++; - } catch (Exception e) { - // Remove the result set flow file and propagate the exception - session.remove(resultSetFF); - if (e instanceof ProcessException) { - throw (ProcessException) e; - } else { - throw new ProcessException(e); - } - } - } while (maxRowsPerFlowFile > 0 && nrOfRows.get() == maxRowsPerFlowFile); - - // If we are splitting results but not outputting batches, set count on all FlowFiles - if (outputBatchSize == 0 && maxRowsPerFlowFile > 0) { - for (int i = 0; i < resultSetFlowFiles.size(); i++) { - resultSetFlowFiles.set(i, - session.putAttribute(resultSetFlowFiles.get(i), FRAGMENT_COUNT, Integer.toString(fragmentIndex))); - } - } - } catch (final SQLException e) { - throw new ProcessException(e); - } - - resultCount++; - } - - // are there anymore result sets? - try{ - hasResults = st.getMoreResults(Statement.CLOSE_CURRENT_RESULT); - hasUpdateCount = st.getUpdateCount() != -1; - } catch(SQLException ex){ - hasResults = false; - hasUpdateCount = false; - } - } - - // Transfer any remaining files to SUCCESS - session.transfer(resultSetFlowFiles, REL_SUCCESS); - resultSetFlowFiles.clear(); - - //If we had at least one result then it's OK to drop the original file, but if we had no results then - // pass the original flow file down the line to trigger downstream processors - if(fileToProcess != null){ - if(resultCount > 0){ - session.remove(fileToProcess); - } else { - fileToProcess = session.write(fileToProcess, JdbcCommon::createEmptyAvroStream); - - fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, "0"); - fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); - session.transfer(fileToProcess, REL_SUCCESS); - } - } else if(resultCount == 0){ - //If we had no inbound FlowFile, no exceptions, and the SQL generated no result sets (Insert/Update/Delete statements only) - // Then generate an empty Output FlowFile - FlowFile resultSetFF = session.create(); - - resultSetFF = session.write(resultSetFF, out -> JdbcCommon.createEmptyAvroStream(out)); - - resultSetFF = session.putAttribute(resultSetFF, RESULT_ROW_COUNT, "0"); - resultSetFF = session.putAttribute(resultSetFF, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); - session.transfer(resultSetFF, REL_SUCCESS); - } - } catch (final ProcessException | SQLException e) { - //If we had at least one result then it's OK to drop the original file, but if we had no results then - // pass the original flow file down the line to trigger downstream processors - if (fileToProcess == null) { - // This can happen if any exceptions occur while setting up the connection, statement, etc. - logger.error("Unable to execute SQL select query {} due to {}. No FlowFile to route to failure", - new Object[]{selectQuery, e}); - context.yield(); - } else { - if (context.hasIncomingConnection()) { - logger.error("Unable to execute SQL select query {} for {} due to {}; routing to failure", - new Object[]{selectQuery, fileToProcess, e}); - fileToProcess = session.penalize(fileToProcess); - } else { - logger.error("Unable to execute SQL select query {} due to {}; routing to failure", - new Object[]{selectQuery, e}); - context.yield(); - } - session.transfer(fileToProcess, REL_FAILURE); - } - } + final JdbcCommon.AvroConversionOptions options = JdbcCommon.AvroConversionOptions.builder() + .convertNames(convertNamesForAvro) + .useLogicalTypes(useAvroLogicalTypes) + .defaultPrecision(defaultPrecision) + .defaultScale(defaultScale) + .maxRows(maxRowsPerFlowFile) + .codecFactory(codec) + .build(); + return new DefaultAvroSqlWriter(options); } }
