Repository: nifi
Updated Branches:
  refs/heads/master 3386839eb -> 8f37ad451


http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
new file mode 100644
index 0000000..ef50bfa
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient
+import com.github.shyiko.mysql.binlog.event.Event
+
+import java.util.concurrent.TimeoutException
+
+/**
+ * A mock implementation for BinaryLogClient, in order to unit test the 
connection and event handling logic
+ */
+class MockBinlogClient extends BinaryLogClient {
+
+    String hostname
+    int port
+    String username
+    String password
+
+    boolean connected
+    boolean connectionTimeout = false
+    boolean connectionError = false
+
+    List<BinaryLogClient.EventListener> eventListeners = []
+    List<BinaryLogClient.LifecycleListener> lifecycleListeners = []
+
+
+    MockBinlogClient(String hostname, int port, String username, String 
password) {
+        super(hostname, port, username, password)
+        this.hostname = hostname
+        this.port = port
+        this.username = username
+        this.password = password
+    }
+
+    @Override
+    void connect(long timeoutInMilliseconds) throws IOException, 
TimeoutException {
+        if (connectionTimeout) {
+            throw new TimeoutException('Connection timed out')
+        }
+        if (connectionError) {
+            throw new IOException('Error during connect')
+        }
+        if (password == null) {
+            throw new NullPointerException('''Password can't be null''')
+        }
+        connected = true
+    }
+
+    @Override
+    void disconnect() throws IOException {
+        connected = false
+    }
+
+
+    @Override
+    void registerEventListener(BinaryLogClient.EventListener eventListener) {
+        if (!eventListeners.contains(eventListener)) {
+            eventListeners.add eventListener
+        }
+    }
+
+    @Override
+    void unregisterEventListener(BinaryLogClient.EventListener eventListener) {
+        eventListeners.remove eventListener
+    }
+
+    @Override
+    void registerLifecycleListener(BinaryLogClient.LifecycleListener 
lifecycleListener) {
+        if (!lifecycleListeners.contains(lifecycleListener)) {
+            lifecycleListeners.add lifecycleListener
+        }
+    }
+
+    @Override
+    void unregisterLifecycleListener(BinaryLogClient.LifecycleListener 
lifecycleListener) {
+        lifecycleListeners.remove lifecycleListener
+    }
+
+    def sendEvent(Event event) {
+        eventListeners.each { it.onEvent(event) }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
new file mode 100644
index 0000000..5591a62
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -0,0 +1,795 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.processors
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData
+import com.github.shyiko.mysql.binlog.event.Event
+import com.github.shyiko.mysql.binlog.event.EventData
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4
+import com.github.shyiko.mysql.binlog.event.EventType
+import com.github.shyiko.mysql.binlog.event.QueryEventData
+import com.github.shyiko.mysql.binlog.event.RotateEventData
+import com.github.shyiko.mysql.binlog.event.TableMapEventData
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
+import org.apache.commons.io.output.WriterOutputStream
+import org.apache.nifi.cdc.mysql.MockBinlogClient
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.components.state.Scope
+import org.apache.nifi.controller.AbstractControllerService
+import org.apache.nifi.distributed.cache.client.Deserializer
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
+import 
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
+import org.apache.nifi.distributed.cache.client.Serializer
+import org.apache.nifi.flowfile.attributes.CoreAttributes
+import org.apache.nifi.logging.ComponentLog
+
+import org.apache.nifi.cdc.event.ColumnDefinition
+import org.apache.nifi.cdc.event.TableInfo
+import org.apache.nifi.cdc.event.TableInfoCacheKey
+import org.apache.nifi.cdc.event.io.EventWriter
+import org.apache.nifi.provenance.ProvenanceEventType
+import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.state.MockStateManager
+import org.apache.nifi.util.MockComponentLog
+import org.apache.nifi.util.MockControllerServiceInitializationContext
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+
+import java.sql.Connection
+import java.sql.ResultSet
+import java.sql.SQLException
+import java.sql.Statement
+import java.util.regex.Matcher
+import java.util.regex.Pattern
+
+import static org.junit.Assert.assertEquals
+import static org.junit.Assert.assertTrue
+import static org.mockito.Matchers.anyString
+import static org.mockito.Mockito.mock
+import static org.mockito.Mockito.when
+
+/**
+ * Unit test(s) for MySQL CDC
+ */
+class CaptureChangeMySQLTest {
+    CaptureChangeMySQL processor
+    TestRunner testRunner
+    MockBinlogClient client
+
+    @Before
+    void setUp() throws Exception {
+        processor = new MockCaptureChangeMySQL()
+        testRunner = TestRunners.newTestRunner(processor)
+        client = new MockBinlogClient('localhost', 3306, 'root', 'password')
+    }
+
+    @After
+    void tearDown() throws Exception {
+
+    }
+
+    @Test
+    void testBeginCommitTransaction() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+    }
+
+    @Test
+    void testInitialSequenceIdIgnoredWhenStatePresent() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
+        
testRunner.getStateManager().setState([("${EventWriter.SEQUENCE_ID_KEY}".toString()):
 '1'], Scope.CLUSTER)
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+
+        resultFiles.eachWithIndex { e, i ->
+            // Sequence ID should start from 1 (as was put into the state 
map), showing that the
+            // Initial Sequence ID value was ignored
+            assertEquals(i + 1, 
Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
+        }
+    }
+
+    @Test
+    void testInitialSequenceIdNoStatePresent() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+
+        resultFiles.eachWithIndex { e, i ->
+            assertEquals(i + 10, 
Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
+        }
+    }
+
+    @Test(expected = AssertionError.class)
+    void testCommitWithoutBegin() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+
+        testRunner.run(1, false, true)
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+    }
+
+    @Test
+    void testExtendedTransaction() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, 
'master.000001')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
+        def clientProperties = [:]
+        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
'localhost')
+        testRunner.addControllerService('client', cacheClient, 
clientProperties)
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
+        testRunner.enableControllerService(cacheClient)
+
+
+        testRunner.run(1, false, true)
+
+        // ROTATE scenario
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // INSERT scenario
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, 
nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'myTable', columnTypes: 
[4, -4] as byte[]] as TableMapEventData
+        ))
+
+        def cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: 
EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[], [3, 'Jones'] as 
Serializable[], [10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as 
WriteRowsEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        // UPDATE scenario
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 16] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, 
nextPosition: 18] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'myTable', columnTypes: 
[4, -4] as byte[]] as TableMapEventData
+        ))
+
+        def colsBefore = new BitSet()
+        colsBefore.set(0)
+        colsBefore.set(1)
+        def colsAfter = new BitSet()
+        colsAfter.set(1)
+        Map.Entry<Serializable[], Serializable[]> updateMapEntry = new 
Map.Entry<Serializable[], Serializable[]>() {
+            Serializable[] getKey() {
+                return [2, 'Smith'] as Serializable[]
+            }
+
+            @Override
+            Serializable[] getValue() {
+                return [3, 'Jones'] as Serializable[]
+            }
+
+            @Override
+            Serializable[] setValue(Serializable[] value) {
+                return [3, 'Jones'] as Serializable[]
+            }
+        }
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.UPDATE_ROWS, 
nextPosition: 20] as EventHeaderV4,
+                [tableId: 1, includedColumnsBeforeUpdate: colsBefore, 
includedColumns: colsAfter, rows: [updateMapEntry]
+                        as List<Map.Entry<Serializable[], Serializable[]>>] as 
UpdateRowsEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 24] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        // ROTATE scenario
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 26] as EventHeaderV4,
+                [binlogFilename: 'master.000002', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 28] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, 
nextPosition: 30] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'myTable', columnTypes: 
[4, -4] as byte[]] as TableMapEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 32] as EventHeaderV4,
+                [database: 'myDB', sql: 'ALTER TABLE myTable add column col1 
int'] as QueryEventData
+        ))
+
+        // DELETE scenario
+        cols = new BitSet()
+        cols.set(0)
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.DELETE_ROWS, 
nextPosition: 36] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols, rows: [[2, 'Smith'] as 
Serializable[], [3, 'Jones'] as Serializable[]] as List<Serializable[]>] as 
DeleteRowsEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 40] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        List<String> expectedEventTypes = ([] + 'begin' + 
Collections.nCopies(3, 'write') + 'commit' + 'begin' + 'update' + 'commit'
+                + 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') 
+ 'commit')
+
+        resultFiles.eachWithIndex { e, i ->
+            assertEquals(i, 
Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
+            assertEquals(EventWriter.APPLICATION_JSON, 
e.getAttribute(CoreAttributes.MIME_TYPE.key()))
+            assertEquals((i < 8) ? 'master.000001' : 'master.000002', 
e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY))
+            
assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 
4 == 0L)
+            assertEquals(e.getAttribute('cdc.event.type'), 
expectedEventTypes[i])
+        }
+        assertEquals(13, resultFiles.size())
+        assertEquals(13, testRunner.provenanceEvents.size())
+        testRunner.provenanceEvents.each { 
assertEquals(ProvenanceEventType.RECEIVE, it.eventType)}
+    }
+
+    @Test(expected = AssertionError.class)
+    void testNoTableInformationAvailable() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
+        def clientProperties = [:]
+        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
'localhost')
+        testRunner.addControllerService('client', cacheClient, 
clientProperties)
+        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
+        testRunner.enableControllerService(cacheClient)
+
+        testRunner.run(1, false, true)
+
+        // ROTATE scenario
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // INSERT scenario
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        def cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: 
EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[], [3, 'Jones'] as 
Serializable[], [10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as 
WriteRowsEventData
+        ))
+
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+    }
+
+    @Test
+    void testSkipTable() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+        testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, 
"myDB")
+        testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // TABLE MAP for table not matching the regex (note the s on the end 
of users vs the regex of 'user')
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, 
nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'users', columnTypes: 
[4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // This WRITE ROWS should be skipped
+        def cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: 
EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[]] as 
List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // TABLE MAP for table matching, all modification events (1) should be 
emitted
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, 
nextPosition: 10] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, 
-4] as byte[]] as TableMapEventData
+        ))
+
+        // WRITE ROWS for matching table
+        cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: 
EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[10, 'Cruz'] as Serializable[]] as 
List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        ////////////////////////
+        // Test database filter
+        ////////////////////////
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // TABLE MAP for database not matching the regex
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, 
nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: 
[4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // This WRITE ROWS should be skipped
+        cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: 
EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[]] as 
List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        // BEGIN + WRITE + COMMIT from table matching, BEGIN + COMMIT for 
database matching
+        assertEquals(5, resultFiles.size())
+    }
+
+    @Test
+    void testTransactionAcrossMultipleProcessorExecutions() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 4] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        // TABLE MAP
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.TABLE_MAP, 
nextPosition: 6] as EventHeaderV4,
+                [tableId: 1, database: 'myDB', table: 'users', columnTypes: 
[4, -4] as byte[]] as TableMapEventData
+        ))
+
+        // Run and Stop the processor
+        testRunner.run(1, true, false)
+
+        def resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        assertEquals(1, resultFiles.size())
+
+        // Re-initialize the processor so it can receive events
+        testRunner.run(1, false, true)
+
+        // This WRITE ROWS should be skipped
+        def cols = new BitSet()
+        cols.set(1)
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: 
EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+                [tableId: 1, includedColumns: cols,
+                 rows   : [[2, 'Smith'] as Serializable[]] as 
List<Serializable[]>] as WriteRowsEventData
+        ))
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 14] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        // Run and Stop the processor
+        testRunner.run(1, true, false)
+
+
+        resultFiles = 
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+        assertEquals(3, resultFiles.size())
+    }
+
+    @Test
+    void testUpdateState() throws Exception {
+        testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+        testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+        testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
+
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        testRunner.run(1, false, false)
+
+        // Ensure state not set, as the processor hasn't been stopped and no 
State Update Interval has been set
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 
null, Scope.CLUSTER)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
null, Scope.CLUSTER)
+
+        // Stop the processor and verify the state is set
+        testRunner.run(1, true, false)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 
'master.000001', Scope.CLUSTER)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
'4', Scope.CLUSTER)
+
+        testRunner.stateManager.clear(Scope.CLUSTER)
+
+        // Send some events, wait for the State Update Interval, and verify 
the state was set
+        testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, '1 
second')
+        testRunner.run(1, false, true)
+
+        // ROTATE
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.ROTATE, 
nextPosition: 2] as EventHeaderV4,
+                [binlogFilename: 'master.000001', binlogPosition: 4L] as 
RotateEventData
+        ))
+
+        // BEGIN
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.QUERY, 
nextPosition: 6] as EventHeaderV4,
+                [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+        ))
+
+        sleep(1000)
+
+        testRunner.run(1, false, false)
+
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 
'master.000001', Scope.CLUSTER)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
'6', Scope.CLUSTER)
+
+        // COMMIT
+        client.sendEvent(new Event(
+                [timestamp: new Date().time, eventType: EventType.XID, 
nextPosition: 12] as EventHeaderV4,
+                {} as EventData
+        ))
+
+        testRunner.run(1, true, false)
+
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 
'master.000001', Scope.CLUSTER)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
'12', Scope.CLUSTER)
+
+    }
+
+    /********************************
+     * Mock and helper classes below
+     ********************************/
+
+    class MockCaptureChangeMySQL extends CaptureChangeMySQL {
+
+        Map<TableInfoCacheKey, TableInfo> cache = new HashMap<>()
+
+        @Override
+        BinaryLogClient createBinlogClient(String hostname, int port, String 
username, String password) {
+            client
+        }
+
+        @Override
+        protected TableInfo loadTableInfo(TableInfoCacheKey key) {
+            TableInfo tableInfo = cache.get(key)
+            if (tableInfo == null) {
+                tableInfo = new TableInfo(key.databaseName, key.tableName, 
key.tableId,
+                        [new ColumnDefinition((byte) 4, 'id'),
+                         new ColumnDefinition((byte) -4, 'string1')
+                        ] as List<ColumnDefinition>)
+                cache.put(key, tableInfo)
+            }
+            return tableInfo
+        }
+
+        @Override
+        protected Connection getJdbcConnection(String locationString, String 
drvName, InetSocketAddress host, String username, String password, Map<String, 
String> customProperties)
+                throws InitializationException, SQLException {
+            Connection mockConnection = mock(Connection)
+            Statement mockStatement = mock(Statement)
+            when(mockConnection.createStatement()).thenReturn(mockStatement)
+            ResultSet mockResultSet = mock(ResultSet)
+            
when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet)
+            return mockConnection
+        }
+    }
+
+
+    static DistributedMapCacheClientImpl createCacheClient() throws 
InitializationException {
+
+        final DistributedMapCacheClientImpl client = new 
DistributedMapCacheClientImpl()
+        final ComponentLog logger = new MockComponentLog("client", client)
+        final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client", logger, new 
MockStateManager(client))
+
+        client.initialize(clientInitContext)
+
+        return client
+    }
+
+    static
+    final class DistributedMapCacheClientImpl extends 
AbstractControllerService implements DistributedMapCacheClient {
+
+        private Map<String, String> cacheMap = new HashMap<>()
+
+        @Override
+        void close() throws IOException {
+        }
+
+        @Override
+        void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        }
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+
+            return [DistributedMapCacheClientService.HOSTNAME,
+                    DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
+                    DistributedMapCacheClientService.PORT,
+                    DistributedMapCacheClientService.SSL_CONTEXT_SERVICE]
+        }
+
+        @Override
+        <K, V> boolean putIfAbsent(
+                final K key,
+                final V value,
+                final Serializer<K> keySerializer, final Serializer<V> 
valueSerializer) throws IOException {
+
+            StringWriter keyWriter = new StringWriter()
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
+            String keyString = keyWriter.toString()
+
+            if (cacheMap.containsKey(keyString)) return false
+
+            StringWriter valueWriter = new StringWriter()
+            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter))
+            return true
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        <K, V> V getAndPutIfAbsent(
+                final K key, final V value, final Serializer<K> keySerializer, 
final Serializer<V> valueSerializer,
+                final Deserializer<V> valueDeserializer) throws IOException {
+            StringWriter keyWriter = new StringWriter()
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
+            String keyString = keyWriter.toString()
+
+            if (cacheMap.containsKey(keyString)) return 
valueDeserializer.deserialize(cacheMap.get(keyString).bytes)
+
+            StringWriter valueWriter = new StringWriter()
+            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter))
+            return null
+        }
+
+        @Override
+        <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
+            StringWriter keyWriter = new StringWriter()
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
+            String keyString = keyWriter.toString()
+
+            return cacheMap.containsKey(keyString)
+        }
+
+        @Override
+        <K, V> V get(
+                final K key,
+                final Serializer<K> keySerializer, final Deserializer<V> 
valueDeserializer) throws IOException {
+            StringWriter keyWriter = new StringWriter()
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
+            String keyString = keyWriter.toString()
+
+            return (cacheMap.containsKey(keyString)) ? 
valueDeserializer.deserialize(cacheMap.get(keyString).bytes) : null
+        }
+
+        @Override
+        <K> boolean remove(final K key, final Serializer<K> serializer) throws 
IOException {
+            StringWriter keyWriter = new StringWriter()
+            serializer.serialize(key, new WriterOutputStream(keyWriter))
+            String keyString = keyWriter.toString()
+
+            boolean removed = (cacheMap.containsKey(keyString))
+            cacheMap.remove(keyString)
+            return removed
+        }
+
+        @Override
+        long removeByPattern(String regex) throws IOException {
+            final List<String> removedRecords = new ArrayList<>()
+            Pattern p = Pattern.compile(regex)
+            for (String key : cacheMap.keySet()) {
+                // Key must be backed by something that can be converted into 
a String
+                Matcher m = p.matcher(key)
+                if (m.matches()) {
+                    removedRecords.add(cacheMap.get(key))
+                }
+            }
+            final long numRemoved = removedRecords.size()
+            removedRecords.each { cacheMap.remove(it) }
+            return numRemoved
+        }
+
+        @Override
+        <K, V> void put(
+                final K key,
+                final V value,
+                final Serializer<K> keySerializer, final Serializer<V> 
valueSerializer) throws IOException {
+            StringWriter keyWriter = new StringWriter()
+            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
+            StringWriter valueWriter = new StringWriter()
+            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter))
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
new file mode 100644
index 0000000..5ee7224
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.junit.Test;
+
+import java.sql.Types;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+
+/**
+ * Unit Tests for MySQLCDCUtils utility class
+ */
+public class MySQLCDCUtilsTest {
+    @Test
+    public void testGetWritableObject() throws Exception {
+        assertNull(MySQLCDCUtils.getWritableObject(null, null));
+        assertNull(MySQLCDCUtils.getWritableObject(Types.INTEGER, null));
+        assertEquals((byte) 1, MySQLCDCUtils.getWritableObject(Types.INTEGER, 
(byte) 1));
+        assertEquals("Hello", MySQLCDCUtils.getWritableObject(Types.VARCHAR, 
"Hello".getBytes()));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/pom.xml 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/pom.xml
new file mode 100644
index 0000000..559c417
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-cdc-mysql-bundle</artifactId>
+    <packaging>pom</packaging>
+    <description>NiFi MySQL CDC Bundle</description>
+    <modules>
+        <module>nifi-cdc-mysql-processors</module>
+        <module>nifi-cdc-mysql-nar</module>
+    </modules>
+    <dependencyManagement>
+        <dependencies>
+          <dependency>
+              <groupId>org.apache.nifi</groupId>
+              <artifactId>nifi-cdc-mysql-processors</artifactId>
+              <version>1.2.0-SNAPSHOT</version>
+          </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/pom.xml 
b/nifi-nar-bundles/nifi-cdc/pom.xml
new file mode 100644
index 0000000..b68b92e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/pom.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-cdc</artifactId>
+    <packaging>pom</packaging>
+    <modules>
+        <module>nifi-cdc-api</module>
+        <module>nifi-cdc-mysql-bundle</module>
+    </modules>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index ea50bbc..6ce7526 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -79,6 +79,7 @@
         <module>nifi-registry-bundle</module>
         <module>nifi-stateful-analysis-bundle</module>
         <module>nifi-poi-bundle</module>
+        <module>nifi-cdc</module>
   </modules>
 
     <build>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e621efa..4e2b3cd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1332,6 +1332,12 @@ language governing permissions and limitations under the 
License. -->
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-cdc-mysql-nar</artifactId>
+                <version>1.2.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>1.2.0-SNAPSHOT</version>
             </dependency>

Reply via email to