Repository: nifi Updated Branches: refs/heads/master 0536c3edf -> 458c987fe
NIFI-4355 - query execution time as attribute of ExecuteSQL Signed-off-by: Matthew Burgess <[email protected]> This closes #2129 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/458c987f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/458c987f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/458c987f Branch: refs/heads/master Commit: 458c987fe3396ecce460c6ae7c099ab8f8b6546d Parents: 0536c3e Author: Pierre Villard <[email protected]> Authored: Wed Sep 6 15:28:48 2017 +0200 Committer: Matthew Burgess <[email protected]> Committed: Wed Sep 6 13:29:19 2017 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/standard/ExecuteSQL.java | 13 ++++++++++--- .../nifi/processors/standard/TestExecuteSQL.java | 2 ++ 2 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/458c987f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 3f05766..ad79595 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -36,6 +36,7 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -68,10 +69,14 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC + "a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. " + "If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the " + "select query. FlowFile attribute 'executesql.row.count' indicates how many rows were selected.") -@WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query") +@WritesAttributes({ + @WritesAttribute(attribute="executesql.row.count", description = "Contains the number of rows returned in the select query"), + @WritesAttribute(attribute="executesql.query.duration", description = "Duration of the query in milliseconds") +}) public class ExecuteSQL extends AbstractProcessor { public static final String RESULT_ROW_COUNT = "executesql.row.count"; + public static final String RESULT_QUERY_DURATION = "executesql.query.duration"; // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -217,14 +222,16 @@ public class ExecuteSQL extends AbstractProcessor { } }); + long duration = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + // set attribute how many rows were selected fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get())); + fileToProcess = session.putAttribute(fileToProcess, RESULT_QUERY_DURATION, String.valueOf(duration)); fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JdbcCommon.MIME_TYPE_AVRO_BINARY); logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()}); - session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", - stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", duration); session.transfer(fileToProcess, REL_SUCCESS); } catch (final ProcessException | SQLException e) { if (fileToProcess == null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/458c987f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 5659e4a..5fd1af8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -262,6 +262,8 @@ public class TestExecuteSQL { runner.run(); runner.assertAllFlowFilesTransferred(ExecuteSQL.REL_SUCCESS, 1); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_QUERY_DURATION); + runner.assertAllFlowFilesContainAttribute(ExecuteSQL.REL_SUCCESS, ExecuteSQL.RESULT_ROW_COUNT); final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS); final InputStream in = new ByteArrayInputStream(flowfiles.get(0).toByteArray());
