Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2868#discussion_r201458637
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
---
@@ -543,5 +550,10 @@ public void processRow(ResultSet resultSet) throws
IOException {
throw new IOException(e);
}
}
+
+ @Override
+ public void rollbackStateChanges() {
+ this.newColMap = this.originalState;
--- End diff --
This only updates the reference, not the actual map pointed to by it (i.e.
the one that's passed in). Maybe instead we use originalState as the reference
to stateMap, and have "updatedStateMap" or something be the copy of the input
map, and update that instead of the passed in map. If all succeeds, then we can
call `originalState.putAll(updatedStateMap)`, if anything fails then the input
map is intact.
I found this when adding a unit test that illustrates the error (and fix),
not sure if it's finished/correct but please feel free to incorporate it if you
like:
```
@Test
public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
// load test data to database
final Connection con = ((DBCPService)
runner.getControllerService("dbcp")).getConnection();
Statement stmt = con.createStatement();
try {
stmt.execute("drop table TEST_NULL_INT");
} catch (final SQLException sqle) {
// Ignore, usually due to Derby not having DROP TABLE IF EXISTS
}
stmt.execute("create table TEST_NULL_INT (id integer not null, val1
integer, val2 integer, constraint my_pk primary key (id))");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1,
NULL, 1)");
stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2,
1, 1)");
runner.setIncomingConnection(false);
runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
// Override adapter with one that fails after the first row is
processed
QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new
GenericDatabaseAdapter() {
boolean fail = false;
@Override
public String getName() {
if(!fail) {
fail = true;
return super.getName();
}
throw new DataFileWriter.AppendWriteException(null);
}
});
runner.run();
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
// State should not have been updated
runner.getStateManager().assertStateNotSet("test_null_int@!@id",
Scope.CLUSTER);
// Restore original (working) adapter and run again
QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
runner.run();
assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
runner.getStateManager().assertStateEquals("test_null_int@!@id",
"2", Scope.CLUSTER);
}
```
---