[
https://issues.apache.org/jira/browse/NIFI-3728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15982500#comment-15982500
]
ASF GitHub Bot commented on NIFI-3728:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1690#discussion_r113112505
--- Diff:
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
---
@@ -362,6 +363,79 @@ class CaptureChangeMySQLTest {
testRunner.provenanceEvents.each {
assertEquals(ProvenanceEventType.RECEIVE, it.eventType)}
}
+ @Test
+ void testExcludeSchemaChanges() throws Exception {
+ testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION,
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
+ testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
+ testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1')
+ testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2
seconds')
+ testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME,
'master.000001')
+ testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION,
'4')
+ testRunner.setProperty(CaptureChangeMySQL.INCLUDE_SCHEMA_CHANGES,
'false')
+ final DistributedMapCacheClientImpl cacheClient =
createCacheClient()
+ def clientProperties = [:]
+
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
'localhost')
+ testRunner.addControllerService('client', cacheClient,
clientProperties)
+ testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT,
'client')
+ testRunner.enableControllerService(cacheClient)
+
+
+ testRunner.run(1, false, true)
+
+ // ROTATE scenario
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.ROTATE,
nextPosition: 2] as EventHeaderV4,
+ [binlogFilename: 'master.000001', binlogPosition: 4L] as
RotateEventData
+ ))
+
+ // INSERT scenario
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY,
nextPosition: 4] as EventHeaderV4,
+ [database: 'myDB', sql: 'BEGIN'] as QueryEventData
+ ))
+
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType:
EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
+ [tableId: 1, database: 'myDB', table: 'myTable',
columnTypes: [4, -4] as byte[]] as TableMapEventData
+ ))
+
+ def cols = new BitSet()
+ cols.set(1)
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType:
EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
+ [tableId: 1, includedColumns: cols,
+ rows : [[2, 'Smith'] as Serializable[], [3, 'Jones'] as
Serializable[], [10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as
WriteRowsEventData
+ ))
+
+ // ALTER TABLE
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.QUERY,
nextPosition: 32] as EventHeaderV4,
+ [database: 'myDB', sql: 'ALTER TABLE myTable add column
col1 int'] as QueryEventData
+ ))
+
+ client.sendEvent(new Event(
+ [timestamp: new Date().time, eventType: EventType.XID,
nextPosition: 40] as EventHeaderV4,
+ {} as EventData
+ ))
+
+ testRunner.run(1, true, false)
+
+ def resultFiles =
testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
+ // No 'schema_change' events expected
+ List<String> expectedEventTypes = ([] + 'begin' +
Collections.nCopies(3, 'insert') + 'commit')
+
+ resultFiles.eachWithIndex { e, i ->
+ assertEquals(i,
Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
+ assertEquals(EventWriter.APPLICATION_JSON,
e.getAttribute(CoreAttributes.MIME_TYPE.key()))
+ assertEquals((i < 8) ? 'master.000001' : 'master.000002',
e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY))
+
assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) %
4 == 0L)
+ assertEquals(e.getAttribute('cdc.event.type'),
expectedEventTypes[i])
--- End diff --
`assertEquals` takes two arguments and the 1st is the expected one. This
line should be:
```
assertEquals(expectedEventTypes[i], e.getAttribute('cdc.event.type'))
```
I got confused when I see an assertion error message.
> CaptureChangeMySQL to capture truncate table statement
> ------------------------------------------------------
>
> Key: NIFI-3728
> URL: https://issues.apache.org/jira/browse/NIFI-3728
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 1.2.0
> Reporter: Koji Kawamura
> Assignee: Matt Burgess
> Fix For: 1.2.0
>
>
> CaptureChangeMySQL has several specific queries to capture, such as 'alter
> table', 'create table' and 'drop table' ... etc. However, it currently does
> not capture 'truncate table' and it should be added.
> Also, this maybe an off topic but I think these [DDL |
> https://en.wikipedia.org/wiki/Data_definition_language] statements can be
> dangerous and some use-cases may require human intervention before executing
> such, instead of fully automating with PutDatabaseRecord. It will be useful
> if CaptureChangeMySQL has a 'Capture DDL statements' property.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)