Repository: nifi Updated Branches: refs/heads/master 8996b7f6d -> 2760b0777
NIFI-5322: Addressed bug that caused QueryRecord to fail (rollback session instead of routing to 'failure') whenever a record in the incoming FlowFile did not adhere to its schema. This happened because the InputStream for the FlowFile was not properly closed. Also updated the text of the Exception to include information from its 'cause' so user is better able to understand the underlying issue. Signed-off-by: Matthew Burgess <[email protected]> This closes #2803 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2760b077 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2760b077 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2760b077 Branch: refs/heads/master Commit: 2760b077700d31ea4a4b11c8769341cdd00e2453 Parents: 8996b7f Author: Mark Payne <[email protected]> Authored: Tue Jun 19 14:46:01 2018 -0400 Committer: Matthew Burgess <[email protected]> Committed: Tue Jun 19 16:01:33 2018 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/QueryRecord.java | 17 ++++++++++-- .../nifi/queryrecord/FlowFileEnumerator.java | 2 +- .../processors/standard/TestQueryRecord.java | 27 ++++++++++++++++++++ 3 files changed, 43 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2760b077/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 2412736..7d9981b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -467,7 +467,13 @@ public class QueryRecord extends AbstractProcessor { final FlowFileTable<?, ?> table = cachedStatement.getTable(); table.setFlowFile(session, flowFile); - final ResultSet rs = stmt.executeQuery(); + final ResultSet rs; + try { + rs = stmt.executeQuery(); + } catch (final Throwable t) { + table.close(); + throw t; + } return new QueryResult() { @Override @@ -516,11 +522,18 @@ public class QueryRecord extends AbstractProcessor { rootSchema.setCacheEnabled(false); statement = connection.createStatement(); - resultSet = statement.executeQuery(sql); + + try { + resultSet = statement.executeQuery(sql); + } catch (final Throwable t) { + flowFileTable.close(); + throw t; + } final ResultSet rs = resultSet; final Statement stmt = statement; final Connection conn = connection; + return new QueryResult() { @Override public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/2760b077/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java index e7b2c26..963b85e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java @@ -62,7 +62,7 @@ public class FlowFileEnumerator<InternalType> implements Enumerator<Object> { currentRow = filterColumns(recordParser.nextRecord()); break; } catch (final Exception e) { - throw new ProcessException("Failed to read next record in stream for " + flowFile, e); + throw new ProcessException("Failed to read next record in stream for " + flowFile + " due to " + e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2760b077/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 345f8e4..b266b47 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -71,6 +71,33 @@ public class TestQueryRecord { } @Test + public void testStreamClosedWhenBadData() throws InitializationException { + final MockRecordParser parser = new MockRecordParser(); + parser.failAfter(0); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("age", RecordFieldType.INT); + parser.addRecord("Tom", 49); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + TestRunner runner = getRunner(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''"); + runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(new byte[0]); + + runner.run(); + + runner.assertTransferCount(QueryRecord.REL_FAILURE, 1); + } + + @Test public void testSimple() throws InitializationException, IOException, SQLException { final MockRecordParser parser = new MockRecordParser(); parser.addSchemaField("name", RecordFieldType.STRING);
