Repository: nifi
Updated Branches:
  refs/heads/master 24bb8cf95 -> 097548da9


NIFI-3746: Fixed DDL event transfer when outside a transaction in 
CaptureChangeMySQL

This closes #1702.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da0454d8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da0454d8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da0454d8

Branch: refs/heads/master
Commit: da0454d80f5c0e5b0268dbd279a80d21d62e7c85
Parents: 24bb8cf
Author: Matt Burgess <[email protected]>
Authored: Wed Apr 26 15:19:12 2017 -0400
Committer: Koji Kawamura <[email protected]>
Committed: Thu Apr 27 08:35:34 2017 +0900

----------------------------------------------------------------------
 .../mysql/processors/CaptureChangeMySQL.java    |  4 +++
 .../processors/CaptureChangeMySQLTest.groovy    | 27 ++++++++++++++++++++
 2 files changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index a8c3336..e5bc8e0 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -808,6 +808,10 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                             if (cacheClient != null) {
                                 
cacheClient.removeByPattern(this.getIdentifier() + ".*");
                             }
+                            // If not in a transaction, commit the session so 
the DDL event(s) will be transferred
+                            if (includeDDLEvents && !inTransaction) {
+                                session.commit();
+                            }
                         }
                     }
                     break;

http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index eb1f32b..1e80383 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -760,6 +760,33 @@ class CaptureChangeMySQLTest {
 
     }
 
+    @Test
+    void testDDLOutsideTransaction() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // DROP TABLE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'DROP TABLE myTable'] as QueryEventData
+        ))
+
+        testRunner.run(1, false, false)
+        testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
+    }
+
     /********************************
      * Mock and helper classes below
      ********************************/

Reply via email to