Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2868#discussion_r201458637
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryDatabaseTable.java
 ---
    @@ -543,5 +550,10 @@ public void processRow(ResultSet resultSet) throws 
IOException {
                     throw new IOException(e);
                 }
             }
    +
    +        @Override
    +        public void rollbackStateChanges() {
    +            this.newColMap = this.originalState;
    --- End diff --
    
    This only updates the reference, not the actual map pointed to by it (i.e. 
the one that's passed in). Maybe instead we use originalState as the reference 
to stateMap, and have "updatedStateMap" or something be the copy of the input 
map, and update that instead of the passed in map. If all succeeds, then we can 
call `originalState.putAll(updatedStateMap)`, if anything fails then the input 
map is intact.
    
    I found this when adding a unit test that illustrates the error (and fix), 
not sure if it's finished/correct but please feel free to incorporate it if you 
like:
    
    ```
    @Test
    public void testWithExceptionAfterSomeRowsProcessed() throws SQLException {
            // load test data to database
            final Connection con = ((DBCPService) 
runner.getControllerService("dbcp")).getConnection();
            Statement stmt = con.createStatement();
    
            try {
                stmt.execute("drop table TEST_NULL_INT");
            } catch (final SQLException sqle) {
                // Ignore, usually due to Derby not having DROP TABLE IF EXISTS
            }
    
            stmt.execute("create table TEST_NULL_INT (id integer not null, val1 
integer, val2 integer, constraint my_pk primary key (id))");
    
            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 
NULL, 1)");
            stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (2, 
1, 1)");
    
            runner.setIncomingConnection(false);
            runner.setProperty(QueryDatabaseTable.TABLE_NAME, "TEST_NULL_INT");
            
runner.setProperty(AbstractDatabaseFetchProcessor.MAX_VALUE_COLUMN_NAMES, "id");
    
            // Override adapter with one that fails after the first row is 
processed
            QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), new 
GenericDatabaseAdapter() {
                boolean fail = false;
                @Override
                public String getName() {
                    if(!fail) {
                        fail = true;
                        return super.getName();
                    }
                    throw new DataFileWriter.AppendWriteException(null);
                }
            });
            runner.run();
            
assertTrue(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
            // State should not have been updated
            runner.getStateManager().assertStateNotSet("test_null_int@!@id", 
Scope.CLUSTER);
    
            // Restore original (working) adapter and run again
            QueryDatabaseTable.dbAdapters.put(dbAdapter.getName(), dbAdapter);
            runner.run();
            
assertFalse(runner.getFlowFilesForRelationship(QueryDatabaseTable.REL_SUCCESS).isEmpty());
            runner.getStateManager().assertStateEquals("test_null_int@!@id", 
"2", Scope.CLUSTER);
        }
    ```


---

Reply via email to