NIFI-3745: Fixed Table caching / primary key logic in PutDatabaseRecord

This closes #1700.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/097548da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/097548da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/097548da

Branch: refs/heads/master
Commit: 097548da9db218d251b43e73e2e189b89eef8510
Parents: d66eac2
Author: Matt Burgess <[email protected]>
Authored: Wed Apr 26 14:44:49 2017 -0400
Committer: Koji Kawamura <[email protected]>
Committed: Thu Apr 27 08:43:13 2017 +0900

----------------------------------------------------------------------
 .../processors/standard/PutDatabaseRecord.java  | 52 ++++++-----------
 .../standard/TestPutDatabaseRecord.groovy       | 60 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/097548da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
index 2797205..fd414c4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java
@@ -371,7 +371,7 @@ public class PutDatabaseRecord extends AbstractProcessor {
                 final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
                 final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
                 final String updateKeys = 
context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(flowFile).getValue();
-                final SchemaKey schemaKey = new SchemaKey(catalog, tableName);
+                final SchemaKey schemaKey = new SchemaKey(catalog, schemaName, 
tableName);
 
                 // Get the statement type from the attribute if necessary
                 String statementType = statementTypeProperty;
@@ -470,7 +470,9 @@ public class PutDatabaseRecord extends AbstractProcessor {
                                 return;
                             }
 
-                            final boolean includePrimaryKeys = 
UPDATE_TYPE.equalsIgnoreCase(statementType) && updateKeys == null;
+                            // Always get the primary keys if Update Keys is 
empty. Otherwise if we have an Insert statement first, the table will be
+                            // cached but the primary keys will not be 
retrieved, causing future UPDATE statements to not have primary keys available
+                            final boolean includePrimaryKeys = (updateKeys == 
null);
 
                             // get the database schema from the cache, if one 
exists. We do this in a synchronized block, rather than
                             // using a ConcurrentMap because the Map that we 
are using is a LinkedHashMap with a capacity such that if
@@ -1056,53 +1058,33 @@ public class PutDatabaseRecord extends 
AbstractProcessor {
 
     static class SchemaKey {
         private final String catalog;
+        private final String schemaName;
         private final String tableName;
 
-        public SchemaKey(final String catalog, final String tableName) {
+        public SchemaKey(final String catalog, final String schemaName, final 
String tableName) {
             this.catalog = catalog;
+            this.schemaName = schemaName;
             this.tableName = tableName;
         }
 
         @Override
         public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((catalog == null) ? 0 : 
catalog.hashCode());
-            result = prime * result + ((tableName == null) ? 0 : 
tableName.hashCode());
+            int result = catalog != null ? catalog.hashCode() : 0;
+            result = 31 * result + (schemaName != null ? schemaName.hashCode() 
: 0);
+            result = 31 * result + tableName.hashCode();
             return result;
         }
 
         @Override
-        public boolean equals(final Object obj) {
-            if (this == obj) {
-                return true;
-            }
-            if (obj == null) {
-                return false;
-            }
-            if (getClass() != obj.getClass()) {
-                return false;
-            }
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
 
-            final SchemaKey other = (SchemaKey) obj;
-            if (catalog == null) {
-                if (other.catalog != null) {
-                    return false;
-                }
-            } else if (!catalog.equals(other.catalog)) {
-                return false;
-            }
-
-
-            if (tableName == null) {
-                if (other.tableName != null) {
-                    return false;
-                }
-            } else if (!tableName.equals(other.tableName)) {
-                return false;
-            }
+            SchemaKey schemaKey = (SchemaKey) o;
 
-            return true;
+            if (catalog != null ? !catalog.equals(schemaKey.catalog) : 
schemaKey.catalog != null) return false;
+            if (schemaName != null ? !schemaName.equals(schemaKey.schemaName) 
: schemaKey.schemaName != null) return false;
+            return tableName.equals(schemaKey.tableName);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/097548da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 6224e0e..bb12fb4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -345,6 +345,66 @@ class TestPutDatabaseRecord {
     }
 
     @Test
+    void testUpdateAfterInsert() throws InitializationException, 
ProcessException, SQLException, IOException {
+        recreateTable("PERSONS", createPersons)
+        final MockRecordParser parser = new MockRecordParser()
+        runner.addControllerService("parser", parser)
+        runner.enableControllerService(parser)
+
+        parser.addSchemaField("id", RecordFieldType.INT)
+        parser.addSchemaField("name", RecordFieldType.STRING)
+        parser.addSchemaField("code", RecordFieldType.INT)
+
+        parser.addRecord(1, 'rec1', 101)
+        parser.addRecord(2, 'rec2', 102)
+
+        runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.INSERT_TYPE)
+        runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')
+
+        runner.enqueue(new byte[0])
+        runner.run()
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        final Connection conn = dbcp.getConnection()
+        Statement stmt = conn.createStatement()
+        stmt = conn.createStatement()
+        ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(101, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(102, rs.getInt(3))
+        assertFalse(rs.next())
+        stmt.close()
+        runner.clearTransferState()
+
+        parser.addRecord(1, 'rec1', 201)
+        parser.addRecord(2, 'rec2', 202)
+        runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, 
PutDatabaseRecord.UPDATE_TYPE)
+        runner.enqueue(new byte[0])
+        runner.run(1,true,false)
+
+        runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
+        stmt = conn.createStatement()
+        rs = stmt.executeQuery('SELECT * FROM PERSONS')
+        assertTrue(rs.next())
+        assertEquals(1, rs.getInt(1))
+        assertEquals('rec1', rs.getString(2))
+        assertEquals(201, rs.getInt(3))
+        assertTrue(rs.next())
+        assertEquals(2, rs.getInt(1))
+        assertEquals('rec2', rs.getString(2))
+        assertEquals(202, rs.getInt(3))
+        assertFalse(rs.next())
+        stmt.close()
+        conn.close()
+    }
+
+    @Test
     void testUpdateNoPrimaryKeys() throws InitializationException, 
ProcessException, SQLException, IOException {
         recreateTable("PERSONS", 'CREATE TABLE PERSONS (id integer, name 
varchar(100), code integer)')
         final MockRecordParser parser = new MockRecordParser()

Reply via email to