Repository: nifi Updated Branches: refs/heads/master 24bb8cf95 -> 097548da9
NIFI-3746: Fixed DDL event transfer when outside a transaction in CaptureChangeMySQL This closes #1702. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/da0454d8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/da0454d8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/da0454d8 Branch: refs/heads/master Commit: da0454d80f5c0e5b0268dbd279a80d21d62e7c85 Parents: 24bb8cf Author: Matt Burgess <[email protected]> Authored: Wed Apr 26 15:19:12 2017 -0400 Committer: Koji Kawamura <[email protected]> Committed: Thu Apr 27 08:35:34 2017 +0900 ---------------------------------------------------------------------- .../mysql/processors/CaptureChangeMySQL.java | 4 +++ .../processors/CaptureChangeMySQLTest.groovy | 27 ++++++++++++++++++++ 2 files changed, 31 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index a8c3336..e5bc8e0 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -808,6 +808,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { if (cacheClient != null) { cacheClient.removeByPattern(this.getIdentifier() + ".*"); } + // If not in a transaction, commit the session so the DDL event(s) will be transferred + if (includeDDLEvents && !inTransaction) { + session.commit(); + } } } break; http://git-wip-us.apache.org/repos/asf/nifi/blob/da0454d8/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index eb1f32b..1e80383 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -760,6 +760,33 @@ class CaptureChangeMySQLTest { } + @Test + void testDDLOutsideTransaction() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar') + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') + testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') + testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true') + + testRunner.run(1, false, true) + + // ROTATE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4, + [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData + )) + + // DROP TABLE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'DROP TABLE myTable'] as QueryEventData + )) + + testRunner.run(1, false, false) + testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1) + } + /******************************** * Mock and helper classes below ********************************/
