NIFI-3743: Include RENAME TABLE events in CaptureChangeMySQL This closes #1701.
Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d66eac2e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d66eac2e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d66eac2e Branch: refs/heads/master Commit: d66eac2ea10047d0e12eadb597333e5095076953 Parents: da0454d Author: Matt Burgess <[email protected]> Authored: Wed Apr 26 14:53:21 2017 -0400 Committer: Koji Kawamura <[email protected]> Committed: Thu Apr 27 08:42:01 2017 +0900 ---------------------------------------------------------------------- .../mysql/processors/CaptureChangeMySQL.java | 1 + .../processors/CaptureChangeMySQLTest.groovy | 40 ++++++++++++++++++++ 2 files changed, 41 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index e5bc8e0..96be0c9 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -797,6 +797,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { || normalizedQuery.startsWith("alter ignore table") || normalizedQuery.startsWith("create table") || normalizedQuery.startsWith("truncate table") + || normalizedQuery.startsWith("rename table") || normalizedQuery.startsWith("drop table") || normalizedQuery.startsWith("drop database")) { http://git-wip-us.apache.org/repos/asf/nifi/blob/d66eac2e/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 1e80383..8106e40 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -787,6 +787,46 @@ class CaptureChangeMySQLTest { testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1) } + @Test + void testRenameTable() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar') + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') + testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true') + + testRunner.run(1, false, true) + + // ROTATE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4, + [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData + )) + + // BEGIN + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'BEGIN'] as QueryEventData + )) + + // RENAME TABLE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'RENAME TABLE myTable TO myTable2'] as QueryEventData + )) + + // COMMIT + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4, + {} as EventData + )) + + testRunner.run(1, true, false) + + def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) + assertEquals(1, resultFiles.size()) + } + /******************************** * Mock and helper classes below ********************************/
