Repository: nifi
Updated Branches:
  refs/heads/NIFI-5744_a [created] 9358a60d3


NIFI-5744: Put exception message to attribute while ExecuteSQL fail

This closes #3107.

Signed-off-by: Peter Wicks <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9358a60d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9358a60d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9358a60d

Branch: refs/heads/NIFI-5744_a
Commit: 9358a60d33be966233d80b00c07928ccb17c4059
Parents: 7bcf9fc
Author: yjhyjhyjh0 <[email protected]>
Authored: Wed Oct 24 21:22:06 2018 +0800
Committer: Peter Wicks <[email protected]>
Committed: Tue Nov 20 08:56:15 2018 -0700

----------------------------------------------------------------------
 .../processors/standard/AbstractExecuteSQL.java |  2 +
 .../nifi/processors/standard/ExecuteSQL.java    |  2 +
 .../processors/standard/ExecuteSQLRecord.java   |  2 +
 .../processors/standard/TestExecuteSQL.java     |  7 +++-
 .../standard/TestExecuteSQLRecord.java          | 43 ++++++++++++++++++++
 5 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
index d1fabef..76e36fd 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java
@@ -61,6 +61,7 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
     public static final String RESULT_QUERY_EXECUTION_TIME = 
"executesql.query.executiontime";
     public static final String RESULT_QUERY_FETCH_TIME = 
"executesql.query.fetchtime";
     public static final String RESULTSET_INDEX = "executesql.resultset.index";
+    public static final String RESULT_ERROR_MESSAGE = 
"executesql.error.message";
 
     public static final String FRAGMENT_ID = 
FragmentAttributes.FRAGMENT_ID.key();
     public static final String FRAGMENT_INDEX = 
FragmentAttributes.FRAGMENT_INDEX.key();
@@ -402,6 +403,7 @@ public abstract class AbstractExecuteSQL extends 
AbstractProcessor {
                             new Object[]{selectQuery, e});
                     context.yield();
                 }
+                
session.putAttribute(fileToProcess,RESULT_ERROR_MESSAGE,e.getMessage());
                 session.transfer(fileToProcess, REL_FAILURE);
             }
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
index 99e0d2a..cfdef29 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java
@@ -87,6 +87,8 @@ import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
                 + "If 'Max Rows Per Flow File' is set, then this number will 
reflect only the fetch time for the rows in the Flow File instead of the entire 
result set."),
         @WritesAttribute(attribute = "executesql.resultset.index", description 
= "Assuming multiple result sets are returned, "
                 + "the zero based index of this result set."),
+        @WritesAttribute(attribute = "executesql.error.message", description = 
"If processing an incoming flow file causes "
+                + "an Exception, the Flow File is routed to failure and this 
attribute is set to the exception message."),
         @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Rows Per Flow File' is set then all FlowFiles from the same query result 
set "
                 + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
         @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Rows Per Flow File' is set then this is the total number of  "

http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
index 5a84458..80d33c0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQLRecord.java
@@ -79,6 +79,8 @@ import static 
org.apache.nifi.processors.standard.util.JdbcCommon.USE_AVRO_LOGIC
         @WritesAttribute(attribute = "executesql.query.fetchtime", description 
= "Duration of the result set fetch time in milliseconds"),
         @WritesAttribute(attribute = "executesql.resultset.index", description 
= "Assuming multiple result sets are returned, "
                 + "the zero based index of this result set."),
+        @WritesAttribute(attribute = "executesql.error.message", description = 
"If processing an incoming flow file causes "
+                + "an Exception, the Flow File is routed to failure and this 
attribute is set to the exception message."),
         @WritesAttribute(attribute = "fragment.identifier", description = "If 
'Max Rows Per Flow File' is set then all FlowFiles from the same query result 
set "
                 + "will have the same value for the fragment.identifier 
attribute. This can then be used to correlate the results."),
         @WritesAttribute(attribute = "fragment.count", description = "If 'Max 
Rows Per Flow File' is set then this is the total number of  "

http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
index 199bd94..35dfe76 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
@@ -52,6 +52,7 @@ import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -463,7 +464,7 @@ public class TestExecuteSQL {
         ResultSet rs = mock(ResultSet.class);
         when(statement.getResultSet()).thenReturn(rs);
         // Throw an exception the first time you access the ResultSet, this is 
after the flow file to hold the results has been created.
-        when(rs.getMetaData()).thenThrow(SQLException.class);
+        when(rs.getMetaData()).thenThrow(new SQLException("test execute 
statement failed"));
 
         runner.addControllerService("mockdbcp", dbcp, new HashMap<>());
         runner.enableControllerService(dbcp);
@@ -475,6 +476,10 @@ public class TestExecuteSQL {
 
         runner.assertTransferCount(ExecuteSQL.REL_FAILURE, 1);
         runner.assertTransferCount(ExecuteSQL.REL_SUCCESS, 0);
+
+        // Assert exception message has been put to flow file attribute
+        MockFlowFile failedFlowFile = 
runner.getFlowFilesForRelationship(ExecuteSQL.REL_FAILURE).get(0);
+        Assert.assertEquals("java.sql.SQLException: test execute statement 
failed",failedFlowFile.getAttribute(ExecuteSQL.RESULT_ERROR_MESSAGE));
     }
 
     public void invokeOnTrigger(final Integer queryTimeout, final String 
query, final boolean incomingFlowFile, final Map<String,String> attrs, final 
boolean setQueryProperty)

http://git-wip-us.apache.org/repos/asf/nifi/blob/9358a60d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
index 03cdbfc..b0f5cda 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQLRecord.java
@@ -28,6 +28,7 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,6 +39,8 @@ import java.io.File;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
@@ -45,6 +48,10 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestExecuteSQLRecord {
 
@@ -350,6 +357,42 @@ public class TestExecuteSQLRecord {
         assertEquals(durationTime, fetchTime + executionTime);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testWithSqlExceptionErrorProcessingResultSet() throws 
Exception {
+        DBCPService dbcp = mock(DBCPService.class);
+        Connection conn = mock(Connection.class);
+        when(dbcp.getConnection(any(Map.class))).thenReturn(conn);
+        when(dbcp.getIdentifier()).thenReturn("mockdbcp");
+        PreparedStatement statement = mock(PreparedStatement.class);
+        when(conn.prepareStatement(anyString())).thenReturn(statement);
+        when(statement.execute()).thenReturn(true);
+        ResultSet rs = mock(ResultSet.class);
+        when(statement.getResultSet()).thenReturn(rs);
+        // Throw an exception the first time you access the ResultSet, this is 
after the flow file to hold the results has been created.
+        when(rs.getMetaData()).thenThrow(new SQLException("test execute 
statement failed"));
+
+        runner.addControllerService("mockdbcp", dbcp, new HashMap<>());
+        runner.enableControllerService(dbcp);
+        runner.setProperty(AbstractExecuteSQL.DBCP_SERVICE, "mockdbcp");
+
+        MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1);
+        runner.addControllerService("writer", recordWriter);
+        runner.setProperty(ExecuteSQLRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.enableControllerService(recordWriter);
+
+        runner.setIncomingConnection(true);
+        runner.enqueue("SELECT 1");
+        runner.run();
+
+        runner.assertTransferCount(AbstractExecuteSQL.REL_FAILURE, 1);
+        runner.assertTransferCount(AbstractExecuteSQL.REL_SUCCESS, 0);
+
+        // Assert exception message has been put to flow file attribute
+        MockFlowFile failedFlowFile = 
runner.getFlowFilesForRelationship(AbstractExecuteSQL.REL_FAILURE).get(0);
+        Assert.assertEquals("java.sql.SQLException: test execute statement 
failed", failedFlowFile.getAttribute(AbstractExecuteSQL.RESULT_ERROR_MESSAGE));
+    }
+
     @Test
     public void testPreQuery() throws Exception {
         // remove previous test database, if any

Reply via email to