[
https://issues.apache.org/jira/browse/NIFI-4517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613915#comment-16613915
]
ASF GitHub Bot commented on NIFI-4517:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2945#discussion_r217492630
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
---
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.dbcp.DBCPService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.sql.SqlWriter;
+import org.apache.nifi.processors.standard.util.JdbcCommon;
+import org.apache.nifi.util.StopWatch;
+
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public abstract class AbstractExecuteSQL extends AbstractProcessor {
+
+ public static final String RESULT_ROW_COUNT = "executesql.row.count";
+ public static final String RESULT_QUERY_DURATION =
"executesql.query.duration";
+ public static final String RESULT_QUERY_EXECUTION_TIME =
"executesql.query.executiontime";
+ public static final String RESULT_QUERY_FETCH_TIME =
"executesql.query.fetchtime";
+ public static final String RESULTSET_INDEX =
"executesql.resultset.index";
+
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+
+ // Relationships
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("Successfully created FlowFile from SQL query
result set.")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("SQL query execution failed. Incoming FlowFile
will be penalized and routed to this relationship")
+ .build();
+ protected Set<Relationship> relationships;
+
+ public static final PropertyDescriptor DBCP_SERVICE = new
PropertyDescriptor.Builder()
+ .name("Database Connection Pooling Service")
+ .description("The Controller Service that is used to obtain
connection to database")
+ .required(true)
+ .identifiesControllerService(DBCPService.class)
+ .build();
+
+ public static final PropertyDescriptor SQL_SELECT_QUERY = new
PropertyDescriptor.Builder()
+ .name("SQL select query")
+ .description("The SQL select query to execute. The query can
be empty, a constant value, or built from attributes "
+ + "using Expression Language. If this property is
specified, it will be used regardless of the content of "
+ + "incoming flowfiles. If this property is empty, the
content of the incoming flow file is expected "
+ + "to contain a valid SQL select query, to be issued
by the processor to the database. Note that Expression "
+ + "Language is not evaluated for flow file contents.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor QUERY_TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Max Wait Time")
+ .description("The maximum amount of time allowed for a running
SQL select query "
+ + " , zero means there is no limit. Max time less than
1 second will be equal to zero.")
+ .defaultValue("0 seconds")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .sensitive(false)
+ .build();
+
+ public static final PropertyDescriptor MAX_ROWS_PER_FLOW_FILE = new
PropertyDescriptor.Builder()
+ .name("esql-max-rows")
+ .displayName("Max Rows Per Flow File")
+ .description("The maximum number of result rows that will be
included in a single FlowFile. This will allow you to break up very large "
+ + "result sets into multiple FlowFiles. If the value
specified is zero, then all rows are returned in a single FlowFile.")
+ .defaultValue("0")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor OUTPUT_BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("esql-output-batch-size")
+ .displayName("Output Batch Size")
+ .description("The number of output FlowFiles to queue before
committing the process session. When set to zero, the session will be committed
when all result set rows "
+ + "have been processed and the output FlowFiles are
ready for transfer to the downstream relationship. For large result sets, this
can cause a large burst of FlowFiles "
+ + "to be transferred at the end of processor
execution. If this property is set, then when the specified number of FlowFiles
are ready for transfer, then the session will "
+ + "be committed, thus releasing the FlowFiles to the
downstream relationship. NOTE: The fragment.count attribute will not be set on
FlowFiles when this "
+ + "property is set.")
+ .defaultValue("0")
+ .required(true)
+
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected List<PropertyDescriptor> propDescriptors;
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return propDescriptors;
+ }
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ // If the query is not set, then an incoming flow file is needed.
Otherwise fail the initialization
+ if (!context.getProperty(SQL_SELECT_QUERY).isSet() &&
!context.hasIncomingConnection()) {
+ final String errorString = "Either the Select Query must be
specified or there must be an incoming connection "
+ + "providing flowfile(s) containing a SQL select
query";
+ getLogger().error(errorString);
+ throw new ProcessException(errorString);
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ FlowFile fileToProcess = null;
+ if (context.hasIncomingConnection()) {
+ fileToProcess = session.get();
+
+ // If we have no FlowFile, and all incoming connections are
self-loops then we can continue on.
+ // However, if we have no FlowFile and we have connections
coming from other Processors, then
+ // we know that we should run only if we have a FlowFile.
+ if (fileToProcess == null && context.hasNonLoopConnection()) {
+ return;
+ }
+ }
+
+ final List<FlowFile> resultSetFlowFiles = new ArrayList<>();
+
+ final ComponentLog logger = getLogger();
+ final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final Integer queryTimeout =
context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue();
+ final Integer maxRowsPerFlowFile =
context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger();
+ final Integer outputBatchSizeField =
context.getProperty(OUTPUT_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+ final int outputBatchSize = outputBatchSizeField == null ? 0 :
outputBatchSizeField;
+
+ SqlWriter sqlWriter = configureSqlWriter(session, context,
fileToProcess);
+
+ final String selectQuery;
+ if (context.getProperty(SQL_SELECT_QUERY).isSet()) {
+ selectQuery =
context.getProperty(SQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
+ } else {
+ // If the query is not set, then an incoming flow file is
required, and expected to contain a valid SQL select query.
+ // If there is no incoming connection, onTrigger will not be
called as the processor will fail when scheduled.
+ final StringBuilder queryContents = new StringBuilder();
+ session.read(fileToProcess, in ->
queryContents.append(IOUtils.toString(in, Charset.defaultCharset())));
+ selectQuery = queryContents.toString();
+ }
+
+ int resultCount = 0;
+ try (final Connection con =
dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() :
fileToProcess.getAttributes());
+ final PreparedStatement st =
con.prepareStatement(selectQuery)) {
+ st.setQueryTimeout(queryTimeout); // timeout in seconds
+
+ if (fileToProcess != null) {
+ JdbcCommon.setParameters(st,
fileToProcess.getAttributes());
+ }
+ logger.debug("Executing query {}", new Object[]{selectQuery});
+
+ int fragmentIndex = 0;
+ final String fragmentId = UUID.randomUUID().toString();
+
+ final StopWatch executionTime = new StopWatch(true);
+
+ boolean hasResults = st.execute();
+
+ long executionTimeElapsed =
executionTime.getElapsed(TimeUnit.MILLISECONDS);
+
+ boolean hasUpdateCount = st.getUpdateCount() != -1;
+
+ while (hasResults || hasUpdateCount) {
+ //getMoreResults() and execute() return false to indicate
that the result of the statement is just a number and not a ResultSet
+ if (hasResults) {
+ final AtomicLong nrOfRows = new AtomicLong(0L);
+
+ try {
+ final ResultSet resultSet = st.getResultSet();
+ do {
+ final StopWatch fetchTime = new
StopWatch(true);
+
+ FlowFile resultSetFF;
+ if (fileToProcess == null) {
+ resultSetFF = session.create();
+ } else {
+ resultSetFF =
session.create(fileToProcess);
+ resultSetFF =
session.putAllAttributes(resultSetFF, fileToProcess.getAttributes());
+ }
+
+ try {
+ resultSetFF = session.write(resultSetFF,
out -> {
+ try {
+
nrOfRows.set(sqlWriter.writeResultSet(resultSet, out, getLogger(), null));
+ } catch (Exception e) {
+ throw (e instanceof
ProcessException) ? (ProcessException) e : new ProcessException(e);
+ }
+ });
+
+ long fetchTimeElapsed =
fetchTime.getElapsed(TimeUnit.MILLISECONDS);
+
+ // set attributes
+ final Map<String, String> attributesToAdd
= new HashMap<>();
+ attributesToAdd.put(RESULT_ROW_COUNT,
String.valueOf(nrOfRows.get()));
+ attributesToAdd.put(RESULT_QUERY_DURATION,
String.valueOf(executionTimeElapsed + fetchTimeElapsed));
+
attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME,
String.valueOf(executionTimeElapsed));
+
attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed));
+ attributesToAdd.put(RESULTSET_INDEX,
String.valueOf(resultCount));
+
attributesToAdd.putAll(sqlWriter.getAttributesToAdd());
+ resultSetFF =
session.putAllAttributes(resultSetFF, attributesToAdd);
+ sqlWriter.updateCounters(session);
+
+ // if fragmented ResultSet, determine if
we should keep this fragment; set fragment attributes
+ if (maxRowsPerFlowFile > 0) {
+ // if row count is zero and this is
not the first fragment, drop it instead of committing it.
+ if (nrOfRows.get() == 0 &&
fragmentIndex > 0) {
+ session.remove(resultSetFF);
+ break;
+ }
+
+ resultSetFF =
session.putAttribute(resultSetFF, FRAGMENT_ID, fragmentId);
+ resultSetFF =
session.putAttribute(resultSetFF, FRAGMENT_INDEX,
String.valueOf(fragmentIndex));
+ }
+
+ logger.info("{} contains {} records;
transferring to 'success'",
+ new Object[]{resultSetFF,
nrOfRows.get()});
+
session.getProvenanceReporter().modifyContent(resultSetFF, "Retrieved " +
nrOfRows.get() + " rows", executionTimeElapsed + fetchTimeElapsed);
--- End diff --
Maybe if there were no incoming flow file, it should be a receive? I'll
check the history on this.
> Allow SQL results to be output as records in any supported format
> -----------------------------------------------------------------
>
> Key: NIFI-4517
> URL: https://issues.apache.org/jira/browse/NIFI-4517
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Priority: Major
>
> ExecuteSQL and QueryDatabaseTable currently only outputs Avro, and the schema
> is only available as embedded within the flow file, not as an attribute such
> as record-aware processors can handle.
> ExecuteSQLÂ and QueryDatabaseTable processors should be updated with a
> RecordSetWriter implementation. This will allow output using any writer
> format (Avro, JSON, CSV, Free-form text, etc.), as well as all the other
> features therein (such as writing the schema to an attribute, and will avoid
> the need for a ConvertAvroToXYZ or ConvertRecord processor downstream.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)