Repository: nifi Updated Branches: refs/heads/NIFI-5744_a [created] 9358a60d3
NIFI-5744: Put exception message to attribute while ExecuteSQL fail This closes #3107. Signed-off-by: Peter Wicks <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9358a60d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9358a60d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9358a60d Branch: refs/heads/NIFI-5744_a Commit: 9358a60d33be966233d80b00c07928ccb17c4059 Parents: 7bcf9fc Author: yjhyjhyjh0 <[email protected]> Authored: Wed Oct 24 21:22:06 2018 +0800 Committer: Peter Wicks <[email protected]> Committed: Tue Nov 20 08:56:15 2018 -0700 ---------------------------------------------------------------------- .../processors/standard/AbstractExecuteSQL.java | 2 + .../nifi/processors/standard/ExecuteSQL.java | 2 + .../processors/standard/ExecuteSQLRecord.java | 2 + .../processors/standard/TestExecuteSQL.java | 7 +++- .../standard/TestExecuteSQLRecord.java | 43 ++++++++++++++++++++ 5 files changed, 55 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index d1fabef..76e36fd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -61,6 +61,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { public static final String RESULT_QUERY_EXECUTION_TIME = "executesql.query.executiontime"; public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime"; public static final String RESULTSET_INDEX = "executesql.resultset.index"; + public static final String RESULT_ERROR_MESSAGE = "executesql.error.message"; public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); @@ -402,6 +403,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { new Object[]{selectQuery, e}); context.yield(); } + session.putAttribute(fileToProcess,RESULT_ERROR_MESSAGE,e.getMessage()); session.transfer(fileToProcess, REL_FAILURE); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 99e0d2a..cfdef29 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -87,6 +87,8 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC + "If 'Max Rows Per Flow File' is set, then this number will reflect only the fetch time for the rows in the Flow File instead of the entire result set."), @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + "the zero based index of this result set."), + @WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes " + + "an Exception, the Flow File is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java index 5a84458..80d33c0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java @@ -79,6 +79,8 @@ import static org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + "the zero based index of this result set."), + @WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes " + + "an Exception, the Flow File is routed to failure and this attribute is set to the exception message."), @WritesAttribute(attribute = "fragment.identifier", description = "If 'Max Rows Per Flow File' is set then all FlowFiles from the same query result set " + "will have the same value for the fragment.identifier attribute. This can then be used to correlate the results."), @WritesAttribute(attribute = "fragment.count", description = "If 'Max Rows Per Flow File' is set then this is the total number of " http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java index 199bd94..35dfe76 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java @@ -52,6 +52,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -463,7 +464,7 @@ public class TestExecuteSQL { ResultSet rs = mock(ResultSet.class); when(statement.getResultSet()).thenReturn(rs); // Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created. - when(rs.getMetaData()).thenThrow(SQLException.class); + when(rs.getMetaData()).thenThrow(new SQLException("test execute statement failed")); runner.addControllerService("mockdbcp", dbcp, new HashMap<>()); runner.enableControllerService(dbcp); @@ -475,6 +476,10 @@ public class TestExecuteSQL { runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 1); runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0); + + // Assert exception message has been put to flow file attribute + MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0); + Assert.assertEquals("java.sql.SQLException: test execute statement failed",failedFlowFile.getAttribute(ExecuteSQL.RESULT_ERROR_MESSAGE)); } public void invokeOnTrigger(final Integer queryTimeout, final String query, final boolean incomingFlowFile, final Map<String,String> attrs, final boolean setQueryProperty) http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java index 03cdbfc..b0f5cda 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java @@ -28,6 +28,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -38,6 +39,8 @@ import java.io.File; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.HashMap; @@ -45,6 +48,10 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestExecuteSQLRecord { @@ -350,6 +357,42 @@ public class TestExecuteSQLRecord { assertEquals(durationTime, fetchTime + executionTime); } + @SuppressWarnings("unchecked") + @Test + public void testWithSqlExceptionErrorProcessingResultSet() throws Exception { + DBCPService dbcp = mock(DBCPService.class); + Connection conn = mock(Connection.class); + when(dbcp.getConnection(any(Map.class))).thenReturn(conn); + when(dbcp.getIdentifier()).thenReturn("mockdbcp"); + PreparedStatement statement = mock(PreparedStatement.class); + when(conn.prepareStatement(anyString())).thenReturn(statement); + when(statement.execute()).thenReturn(true); + ResultSet rs = mock(ResultSet.class); + when(statement.getResultSet()).thenReturn(rs); + // Throw an exception the first time you access the ResultSet, this is after the flow file to hold the results has been created. + when(rs.getMetaData()).thenThrow(new SQLException("test execute statement failed")); + + runner.addControllerService("mockdbcp", dbcp, new HashMap<>()); + runner.enableControllerService(dbcp); + runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "mockdbcp"); + + MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1); + runner.addControllerService("writer", recordWriter); + runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer"); + runner.enableControllerService(recordWriter); + + runner.setIncomingConnection(true); + runner.enqueue("SELECT 1"); + runner.run(); + + runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 1); + runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0); + + // Assert exception message has been put to flow file attribute + MockFlowFile failedFlowFile = runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_FAILURE).get(0); + Assert.assertEquals("java.sql.SQLException: test execute statement failed", failedFlowFile.getAttribute(AbstractExecuteSQL.RESULT_ERROR_MESSAGE)); + } + @Test public void testPreQuery() throws Exception { // remove previous test database, if any
