NIFI-3743: Include RENAME TABLE events in CaptureChangeMySQL

This closes #1701.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d66eac2e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d66eac2e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d66eac2e

Branch: refs/heads/master
Commit: d66eac2ea10047d0e12eadb597333e5095076953
Parents: da0454d
Author: Matt Burgess <[email protected]>
Authored: Wed Apr 26 14:53:21 2017 -0400
Committer: Koji Kawamura <[email protected]>
Committed: Thu Apr 27 08:42:01 2017 +0900

----------------------------------------------------------------------
 .../mysql/processors/CaptureChangeMySQL.java    |  1 +
 .../processors/CaptureChangeMySQLTest.groovy    | 40 ++++++++++++++++++++
 2 files changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index e5bc8e0..96be0c9 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -797,6 +797,7 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                                 || normalizedQuery.startsWith("alter ignore 
table")
                                 || normalizedQuery.startsWith("create table")
                                 || normalizedQuery.startsWith("truncate table")
+                                || normalizedQuery.startsWith("rename table")
                                 || normalizedQuery.startsWith("drop table")
                                 || normalizedQuery.startsWith("drop 
database")) {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 1e80383..8106e40 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -787,6 +787,46 @@ class CaptureChangeMySQLTest {
         testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
     }
 
+    @Test
+    void testRenameTable() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // RENAME TABLE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'RENAME TABLE myTable TO myTable2'] as 
QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        assertEquals(1, resultFiles.size())
+    }
+
     /********************************
      * Mock and helper classes below
      ********************************/

Reply via email to