Repository: nifi
Updated Branches:
  refs/heads/master 8996b7f6d -> 2760b0777


NIFI-5322: Addressed bug that caused QueryRecord to fail (rollback session 
instead of routing to 'failure') whenever a record in the incoming FlowFile did 
not adhere to its schema. This happened because the InputStream for the 
FlowFile was not properly closed. Also updated the text of the Exception to 
include information from its 'cause' so user is better able to understand the 
underlying issue.

Signed-off-by: Matthew Burgess <[email protected]>

This closes #2803


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2760b077
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2760b077
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2760b077

Branch: refs/heads/master
Commit: 2760b077700d31ea4a4b11c8769341cdd00e2453
Parents: 8996b7f
Author: Mark Payne <[email protected]>
Authored: Tue Jun 19 14:46:01 2018 -0400
Committer: Matthew Burgess <[email protected]>
Committed: Tue Jun 19 16:01:33 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/QueryRecord.java   | 17 ++++++++++--
 .../nifi/queryrecord/FlowFileEnumerator.java    |  2 +-
 .../processors/standard/TestQueryRecord.java    | 27 ++++++++++++++++++++
 3 files changed, 43 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2760b077/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 2412736..7d9981b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -467,7 +467,13 @@ public class QueryRecord extends AbstractProcessor {
         final FlowFileTable<?, ?> table = cachedStatement.getTable();
         table.setFlowFile(session, flowFile);
 
-        final ResultSet rs = stmt.executeQuery();
+        final ResultSet rs;
+        try {
+            rs = stmt.executeQuery();
+        } catch (final Throwable t) {
+            table.close();
+            throw t;
+        }
 
         return new QueryResult() {
             @Override
@@ -516,11 +522,18 @@ public class QueryRecord extends AbstractProcessor {
             rootSchema.setCacheEnabled(false);
 
             statement = connection.createStatement();
-            resultSet = statement.executeQuery(sql);
+
+            try {
+                resultSet = statement.executeQuery(sql);
+            } catch (final Throwable t) {
+                flowFileTable.close();
+                throw t;
+            }
 
             final ResultSet rs = resultSet;
             final Statement stmt = statement;
             final Connection conn = connection;
+
             return new QueryResult() {
                 @Override
                 public void close() throws IOException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/2760b077/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
index e7b2c26..963b85e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
@@ -62,7 +62,7 @@ public class FlowFileEnumerator<InternalType> implements 
Enumerator<Object> {
                 currentRow = filterColumns(recordParser.nextRecord());
                 break;
             } catch (final Exception e) {
-                throw new ProcessException("Failed to read next record in 
stream for " + flowFile, e);
+                throw new ProcessException("Failed to read next record in 
stream for " + flowFile + " due to " + e.getMessage(), e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/2760b077/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 345f8e4..b266b47 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -71,6 +71,33 @@ public class TestQueryRecord {
     }
 
     @Test
+    public void testStreamClosedWhenBadData() throws InitializationException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.failAfter(0);
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("age", RecordFieldType.INT);
+        parser.addRecord("Tom", 49);
+
+        final MockRecordWriter writer = new 
MockRecordWriter("\"name\",\"points\"");
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE 
name <> ''");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(QueryRecord.REL_FAILURE, 1);
+    }
+
+    @Test
     public void testSimple() throws InitializationException, IOException, 
SQLException {
         final MockRecordParser parser = new MockRecordParser();
         parser.addSchemaField("name", RecordFieldType.STRING);

Reply via email to