NIFI-3413: Add GetChangeDataCaptureMySQL processor
NIFI-3413: Incorporated review comments
NIFI-3413: Changed GetChangeDataCaptureMySQL to CaptureChangeMySQL, fixed some 
bugs
NIFI-3413: Refactored setup() for better error handling, more review comments 
incorporated
NIFI-3413: Refactored CDC into its own module(s), updated assembly and 
top-level POMs
NIFI-3413: Added RECEIVE prov event and Server ID property

Signed-off-by: ijokarumawak <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8f37ad45
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8f37ad45
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8f37ad45

Branch: refs/heads/master
Commit: 8f37ad4512fdca1627938b9ae4fd40b02ab2999d
Parents: 3386839
Author: Matt Burgess <[email protected]>
Authored: Thu Mar 23 18:43:04 2017 -0400
Committer: ijokarumawak <[email protected]>
Committed: Fri Apr 7 00:44:42 2017 +0900

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |    5 +
 nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml  |   43 +
 .../java/org/apache/nifi/cdc/CDCException.java  |   42 +
 .../apache/nifi/cdc/event/BaseEventInfo.java    |   42 +
 .../apache/nifi/cdc/event/BaseRowEventInfo.java |   37 +
 .../nifi/cdc/event/BaseTableEventInfo.java      |   65 ++
 .../apache/nifi/cdc/event/ColumnDefinition.java |   73 ++
 .../org/apache/nifi/cdc/event/EventInfo.java    |   36 +
 .../nifi/cdc/event/RowEventException.java       |   40 +
 .../org/apache/nifi/cdc/event/RowEventInfo.java |   27 +
 .../apache/nifi/cdc/event/TableEventInfo.java   |   31 +
 .../org/apache/nifi/cdc/event/TableInfo.java    |  155 +++
 .../nifi/cdc/event/TableInfoCacheKey.java       |   98 ++
 .../nifi/cdc/event/io/AbstractEventWriter.java  |   64 ++
 .../apache/nifi/cdc/event/io/EventWriter.java   |   43 +
 .../nifi-cdc-mysql-nar/pom.xml                  |   40 +
 .../src/main/resources/META-INF/NOTICE          |    5 +
 .../nifi-cdc-mysql-processors/pom.xml           |   67 ++
 .../cdc/mysql/event/BaseBinlogEventInfo.java    |   42 +
 .../cdc/mysql/event/BaseBinlogRowEventInfo.java |   48 +
 .../mysql/event/BaseBinlogTableEventInfo.java   |   62 ++
 .../mysql/event/BeginTransactionEventInfo.java  |   27 +
 .../nifi/cdc/mysql/event/BinlogEventInfo.java   |   32 +
 .../cdc/mysql/event/BinlogEventListener.java    |   65 ++
 .../mysql/event/BinlogLifecycleListener.java    |   61 ++
 .../cdc/mysql/event/BinlogTableEventInfo.java   |   25 +
 .../mysql/event/CommitTransactionEventInfo.java |   28 +
 .../cdc/mysql/event/DeleteRowsEventInfo.java    |   32 +
 .../cdc/mysql/event/InsertRowsEventInfo.java    |   35 +
 .../nifi/cdc/mysql/event/MySQLCDCUtils.java     |   47 +
 .../nifi/cdc/mysql/event/RawBinlogEvent.java    |   45 +
 .../cdc/mysql/event/SchemaChangeEventInfo.java  |   38 +
 .../cdc/mysql/event/UpdateRowsEventInfo.java    |   41 +
 .../event/io/AbstractBinlogEventWriter.java     |   67 ++
 .../io/AbstractBinlogTableEventWriter.java      |   65 ++
 .../event/io/BeginTransactionEventWriter.java   |   25 +
 .../event/io/CommitTransactionEventWriter.java  |   27 +
 .../cdc/mysql/event/io/DeleteRowsWriter.java    |   92 ++
 .../cdc/mysql/event/io/InsertRowsWriter.java    |   92 ++
 .../mysql/event/io/SchemaChangeEventWriter.java |   43 +
 .../cdc/mysql/event/io/UpdateRowsWriter.java    |  103 ++
 .../mysql/processors/CaptureChangeMySQL.java    | 1033 ++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   15 +
 .../nifi/cdc/mysql/MockBinlogClient.groovy      |   97 ++
 .../processors/CaptureChangeMySQLTest.groovy    |  795 ++++++++++++++
 .../nifi/cdc/mysql/event/MySQLCDCUtilsTest.java |   39 +
 .../nifi-cdc/nifi-cdc-mysql-bundle/pom.xml      |   41 +
 nifi-nar-bundles/nifi-cdc/pom.xml               |   29 +
 nifi-nar-bundles/pom.xml                        |    1 +
 pom.xml                                         |    6 +
 50 files changed, 4111 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 7751666..fd5ce78 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -458,6 +458,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-stateful-analysis-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cdc-mysql-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml
new file mode 100644
index 0000000..bf24dbb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    
+    <artifactId>nifi-cdc-api</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java
new file mode 100644
index 0000000..7356e4c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc;
+
+/**
+ * An exception class representing errors that occur during Change Data 
Capture (CDC) processing
+ */
+public class CDCException extends Exception {
+
+    public CDCException() {
+    }
+
+    public CDCException(String message) {
+        super(message);
+    }
+
+    public CDCException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CDCException(Throwable cause) {
+        super(cause);
+    }
+
+    public CDCException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java
new file mode 100644
index 0000000..13307bb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+
+/**
+ * A base class for all MySQL binlog events
+ */
+public class BaseEventInfo implements EventInfo {
+
+    private final String eventType;
+    private final Long timestamp;
+
+    public BaseEventInfo(String eventType, Long timestamp) {
+        this.eventType = eventType;
+        this.timestamp = timestamp;
+    }
+
+
+    public String getEventType() {
+        return eventType;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java
new file mode 100644
index 0000000..1107a91
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+
+import java.util.List;
+
+/**
+ * A base class to use for representing information to row mutation events
+ */
+public class BaseRowEventInfo<RowEventDataType> extends BaseTableEventInfo 
implements RowEventInfo<RowEventDataType> {
+
+    protected List<RowEventDataType> rows;
+
+    public BaseRowEventInfo(TableInfo tableInfo, String eventType, Long 
timestamp, List<RowEventDataType> rows) {
+        super(tableInfo, eventType, timestamp);
+        this.rows = rows;
+    }
+
+    public List<RowEventDataType> getRows() {
+        return rows;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java
new file mode 100644
index 0000000..bc34608
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+import java.util.List;
+
+/**
+ * An abstract base class for all MySQL binlog events affecting a table.
+ */
+public class BaseTableEventInfo extends BaseEventInfo implements 
TableEventInfo {
+
+    private String databaseName;
+    private String tableName;
+    private Long tableId;
+
+    private List<ColumnDefinition> columns;
+
+    public BaseTableEventInfo(TableInfo tableInfo, String eventType, Long 
timestamp) {
+        super(eventType, timestamp);
+        if (tableInfo != null) {
+            this.databaseName = tableInfo.getDatabaseName();
+            this.tableName = tableInfo.getTableName();
+            this.tableId = tableInfo.getTableId();
+            this.columns = tableInfo.getColumns();
+        }
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public Long getTableId() {
+        return tableId;
+    }
+
+    public List<ColumnDefinition> getColumns() {
+        return columns;
+    }
+
+    public ColumnDefinition getColumnByIndex(int i) {
+        try {
+            return columns.get(i);
+        } catch (IndexOutOfBoundsException | NullPointerException e) {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java
new file mode 100644
index 0000000..c3f14ec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+
+/**
+ * A class that specifies a definition for a relational table column, 
including type, name, etc.
+ */
+public class ColumnDefinition {
+
+    private int type;
+    private String name = "";
+
+    public ColumnDefinition(int type) {
+        this.type = type;
+    }
+
+    public ColumnDefinition(int type, String name) {
+        this(type);
+        this.name = name;
+    }
+
+    public int getType() {
+        return type;
+    }
+
+    public void setType(int type) {
+        this.type = type;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ColumnDefinition that = (ColumnDefinition) o;
+
+        return new EqualsBuilder()
+                .append(type, that.type)
+                .append(name, that.name)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        int result = type;
+        result = 31 * result + (name != null ? name.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
new file mode 100644
index 0000000..7608cc2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+
+/**
+ * An interface representing a data structure containing event information, 
and serialization/deserlization methods.
+ */
+public interface EventInfo {
+
+    // Event type constants
+    String BEGIN_EVENT = "begin";
+    String COMMIT_EVENT = "commit";
+    String WRITE_EVENT = "write";
+    String DELETE_EVENT = "delete";
+    String UPDATE_EVENT = "update";
+    String SCHEMA_CHANGE = "schema_change";
+
+    String getEventType();
+
+    Long getTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java
new file mode 100644
index 0000000..94f2ef5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+import java.io.IOException;
+
+/**
+ * A marker class for Exceptions that occur during the handling of CDC row 
modification events
+ */
+public class RowEventException extends IOException {
+
+    public RowEventException() {
+    }
+
+    public RowEventException(String message) {
+        super(message);
+    }
+
+    public RowEventException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RowEventException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java
new file mode 100644
index 0000000..b02b25b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+import java.util.List;
+
+/**
+ * An interface corresponding to row-level info from events on tabular data 
structures.
+ */
+public interface RowEventInfo<RowEventDataType> extends TableEventInfo {
+
+    List<RowEventDataType> getRows();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java
new file mode 100644
index 0000000..7754225
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+import java.util.List;
+
+/**
+ * An interface for methods related to events that occur on tabular data 
structures.
+ */
+public interface TableEventInfo extends EventInfo {
+
+    String getDatabaseName();
+    String getTableName();
+    Long getTableId();
+    List<ColumnDefinition> getColumns();
+    ColumnDefinition getColumnByIndex(int i);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
new file mode 100644
index 0000000..d59306b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A POJO for holding table information related to update events.
+ */
+public class TableInfo {
+
+    final static String DB_TABLE_NAME_DELIMITER = "@!@";
+
+    private String databaseName;
+    private String tableName;
+    private Long tableId;
+    private List<ColumnDefinition> columns;
+
+    public TableInfo(String databaseName, String tableName, Long tableId, 
List<ColumnDefinition> columns) {
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.tableId = tableId;
+        this.columns = columns;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public Long getTableId() {
+        return tableId;
+    }
+
+    public List<ColumnDefinition> getColumns() {
+        return columns;
+    }
+
+    public void setColumns(List<ColumnDefinition> columns) {
+        this.columns = columns;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TableInfo that = (TableInfo) o;
+
+        return new EqualsBuilder()
+                .append(databaseName, that.databaseName)
+                .append(tableName, that.tableName)
+                .append(tableId, that.tableId)
+                .append(columns, that.columns)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        int result = databaseName.hashCode();
+        result = 31 * result + tableName.hashCode();
+        result = 31 * result + tableId.hashCode();
+        result = 31 * result + (columns != null ? columns.hashCode() : 0);
+        return result;
+    }
+
+    public static class Serializer implements 
org.apache.nifi.distributed.cache.client.Serializer<TableInfo> {
+
+        @Override
+        public void serialize(TableInfo value, OutputStream output) throws 
SerializationException, IOException {
+            StringBuilder sb = new StringBuilder(value.getDatabaseName());
+            sb.append(DB_TABLE_NAME_DELIMITER);
+            sb.append(value.getTableName());
+            sb.append(DB_TABLE_NAME_DELIMITER);
+            sb.append(value.getTableId());
+            List<ColumnDefinition> columnDefinitions = value.getColumns();
+            if (columnDefinitions != null && !columnDefinitions.isEmpty()) {
+                sb.append(DB_TABLE_NAME_DELIMITER);
+                sb.append(columnDefinitions.stream().map((col) -> 
col.getName() + DB_TABLE_NAME_DELIMITER + 
col.getType()).collect(Collectors.joining(DB_TABLE_NAME_DELIMITER)));
+            }
+            output.write(sb.toString().getBytes());
+        }
+    }
+
+    public static class Deserializer implements 
org.apache.nifi.distributed.cache.client.Deserializer<TableInfo> {
+
+        @Override
+        public TableInfo deserialize(byte[] input) throws 
DeserializationException, IOException {
+            // Don't bother deserializing if empty, just return null. This 
usually happens when the key is not found in the cache
+            if (input == null || input.length == 0) {
+                return null;
+            }
+            String inputString = new String(input);
+            String[] tokens = inputString.split(DB_TABLE_NAME_DELIMITER);
+            int numTokens = tokens.length;
+            if (numTokens < 3) {
+                throw new IOException("Could not deserialize TableInfo from 
the following value: " + inputString);
+            }
+            String dbName = tokens[0];
+            String tableName = tokens[1];
+            Long tableId;
+            try {
+                tableId = Long.parseLong(tokens[2]);
+            } catch (NumberFormatException nfe) {
+                throw new IOException("Illegal table ID: " + tokens[2]);
+            }
+            // Parse column names and types
+            List<ColumnDefinition> columnDefinitions = new ArrayList<>();
+            for (int i = 0; i < numTokens - 3; i += 2) {
+                try {
+                    int columnTypeIndex = i + 4;
+                    int columnNameIndex = i + 3;
+                    if (columnTypeIndex < numTokens) {
+                        columnDefinitions.add(new 
ColumnDefinition(Integer.parseInt(tokens[columnTypeIndex]), 
tokens[columnNameIndex]));
+                    } else {
+                        throw new IOException("No type detected for column: " 
+ tokens[columnNameIndex]);
+                    }
+                } catch (NumberFormatException nfe) {
+                    throw new IOException("Illegal column type value for 
column " + (i / 2 + 1) + ": " + tokens[i + 4]);
+                }
+            }
+
+            return new TableInfo(dbName, tableName, tableId, 
columnDefinitions);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
new file mode 100644
index 0000000..2f185e9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.apache.nifi.cdc.event.TableInfo.DB_TABLE_NAME_DELIMITER;
+
+/**
+ * This class represents a key in a cache that contains information (column 
definitions, e.g.) for a database table
+ */
+public class TableInfoCacheKey {
+
+    private final String databaseName;
+    private final String tableName;
+    private final long tableId;
+    private final String uuidPrefix;
+
+    public TableInfoCacheKey(String uuidPrefix, String databaseName, String 
tableName, long tableId) {
+        this.uuidPrefix = uuidPrefix;
+        this.databaseName = databaseName;
+        this.tableName = tableName;
+        this.tableId = tableId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TableInfoCacheKey that = (TableInfoCacheKey) o;
+
+        return new EqualsBuilder()
+                .append(tableId, that.tableId)
+                .append(databaseName, that.databaseName)
+                .append(tableName, that.tableName)
+                .append(uuidPrefix, that.uuidPrefix)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        int result = databaseName != null ? databaseName.hashCode() : 0;
+        result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
+        result = 31 * result + (int) (tableId ^ (tableId >>> 32));
+        result = 31 * result + (uuidPrefix != null ? uuidPrefix.hashCode() : 
0);
+        return result;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public long getTableId() {
+        return tableId;
+    }
+
+    public String getUuidPrefix() {
+        return uuidPrefix;
+    }
+
+    public static class Serializer implements 
org.apache.nifi.distributed.cache.client.Serializer<TableInfoCacheKey> {
+
+        @Override
+        public void serialize(TableInfoCacheKey key, OutputStream output) 
throws SerializationException, IOException {
+            StringBuilder sb = new StringBuilder(key.getUuidPrefix());
+            sb.append(DB_TABLE_NAME_DELIMITER);
+            sb.append(key.getDatabaseName());
+            sb.append(DB_TABLE_NAME_DELIMITER);
+            sb.append(key.getTableName());
+            sb.append(DB_TABLE_NAME_DELIMITER);
+            sb.append(key.getTableId());
+            output.write(sb.toString().getBytes());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
new file mode 100644
index 0000000..c805d3c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.cdc.event.EventInfo;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An abstract class that outputs common information (event type, timestamp, 
e.g.) about CDC events.
+ */
+public abstract class AbstractEventWriter<T extends EventInfo> implements 
EventWriter<T> {
+
+    private final JsonFactory JSON_FACTORY = new JsonFactory();
+    protected JsonGenerator jsonGenerator;
+
+    // Common method to create a JSON generator and start the root object. 
Should be called by sub-classes unless they need their own generator and such.
+    protected void startJson(OutputStream outputStream, T event) throws 
IOException {
+        jsonGenerator = createJsonGenerator(outputStream);
+        jsonGenerator.writeStartObject();
+        String eventType = event.getEventType();
+        if (eventType == null) {
+            jsonGenerator.writeNullField("type");
+        } else {
+            jsonGenerator.writeStringField("type", eventType);
+        }
+        Long timestamp = event.getTimestamp();
+        if (timestamp == null) {
+            jsonGenerator.writeNullField("timestamp");
+        } else {
+            jsonGenerator.writeNumberField("timestamp", event.getTimestamp());
+        }
+    }
+
+    protected void endJson() throws IOException {
+        if (jsonGenerator == null) {
+            throw new IOException("endJson called without a JsonGenerator");
+        }
+        jsonGenerator.writeEndObject();
+        jsonGenerator.flush();
+        jsonGenerator.close();
+    }
+
+    private JsonGenerator createJsonGenerator(OutputStream out) throws 
IOException {
+        return JSON_FACTORY.createGenerator(out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
new file mode 100644
index 0000000..096e3c1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.cdc.event.EventInfo;
+import org.apache.nifi.processor.Relationship;
+
+/**
+ * An interface to write an event to the process session. Note that a single 
event may produce multiple flow files.
+ */
+public interface EventWriter<T extends EventInfo> {
+
+    String APPLICATION_JSON = "application/json";
+    String SEQUENCE_ID_KEY = "cdc.sequence.id";
+    String CDC_EVENT_TYPE_ATTRIBUTE = "cdc.event.type";
+
+    /**
+     * Writes the given event to the process session, possibly via 
transferring it to the specified relationship (usually used for success)
+     *
+     * @param session           The session to write the event to
+     * @param transitUri        The URI indicating the source MySQL system 
from which the specified event is associated
+     * @param eventInfo         The event data
+     * @param currentSequenceId the current sequence ID
+     * @param relationship      A relationship to transfer any flowfile(s) to
+     * @return a sequence ID, usually incremented from the specified current 
sequence id by the number of flow files transferred and/or committed
+     */
+    long writeEvent(final ProcessSession session, String transitUri, final T 
eventInfo, final long currentSequenceId, Relationship relationship);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml
new file mode 100644
index 0000000..a42b49d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc-mysql-bundle</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-cdc-mysql-nar</artifactId>
+    <packaging>nar</packaging>
+    <description>NiFi MySQL Change Data Capture (CDC) NAR</description>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cdc-mysql-processors</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..7d59e9d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,5 @@
+nifi-cdc-mysql-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
new file mode 100644
index 0000000..13aea10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+license agreements. See the NOTICE file distributed with this work for 
additional
+information regarding copyright ownership. The ASF licenses this file to
+You under the Apache License, Version 2.0 (the "License"); you may not use
+this file except in compliance with the License. You may obtain a copy of
+the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+by applicable law or agreed to in writing, software distributed under the
+License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+OF ANY KIND, either express or implied. See the License for the specific
+language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-cdc-mysql-bundle</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-cdc-mysql-processors</artifactId>
+    <packaging>jar</packaging>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-cdc-api</artifactId>
+            <version>1.2.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.github.shyiko</groupId>
+            <artifactId>mysql-binlog-connector-java</artifactId>
+            <version>0.11.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service</artifactId>
+            <version>1.2.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
new file mode 100644
index 0000000..7089cda
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.apache.nifi.cdc.event.BaseEventInfo;
+
+/**
+ * A base class for all MYSQL binlog events
+ */
+public class BaseBinlogEventInfo extends BaseEventInfo implements 
BinlogEventInfo {
+
+    private String binlogFilename;
+    private Long binlogPosition;
+
+    public BaseBinlogEventInfo(String eventType, Long timestamp, String 
binlogFilename, Long binlogPosition) {
+        super(eventType, timestamp);
+        this.binlogFilename = binlogFilename;
+        this.binlogPosition = binlogPosition;
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+
+    public Long getBinlogPosition() {
+        return binlogPosition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
new file mode 100644
index 0000000..4796947
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.apache.nifi.cdc.event.BaseRowEventInfo;
+import org.apache.nifi.cdc.event.RowEventInfo;
+import org.apache.nifi.cdc.event.TableInfo;
+
+import java.util.BitSet;
+import java.util.List;
+
+/**
+ * A base class to help store information about a row mutation event (UPDATE, 
DELETE, etc.)
+ */
+public class BaseBinlogRowEventInfo<RowEventDataType> extends 
BaseBinlogTableEventInfo implements RowEventInfo<RowEventDataType> {
+
+    private BitSet includedColumns;
+    private RowEventInfo<RowEventDataType> delegate;
+
+    public BaseBinlogRowEventInfo(TableInfo tableInfo, String type, Long 
timestamp, String binlogFilename, Long binlogPosition, BitSet includedColumns, 
List<RowEventDataType> rows) {
+        super(tableInfo, type, timestamp, binlogFilename, binlogPosition);
+        this.includedColumns = includedColumns;
+        this.delegate = new BaseRowEventInfo<>(tableInfo, type, timestamp, 
rows);
+    }
+
+    public BitSet getIncludedColumns() {
+        return includedColumns;
+    }
+
+    @Override
+    public List<RowEventDataType> getRows() {
+        return delegate.getRows();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
new file mode 100644
index 0000000..763d695
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.apache.nifi.cdc.event.BaseTableEventInfo;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.event.TableEventInfo;
+import org.apache.nifi.cdc.event.TableInfo;
+
+import java.util.List;
+
+/**
+ * A base class to handle data common to binlog table events, such as database 
name, table name, etc.
+ */
+public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements 
BinlogTableEventInfo {
+
+    private TableEventInfo delegate;
+
+    public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, 
Long timestamp, String binlogFilename, Long binlogPosition) {
+        super(eventType, timestamp, binlogFilename, binlogPosition);
+        this.delegate = new BaseTableEventInfo(tableInfo, SCHEMA_CHANGE, 
timestamp);
+    }
+
+    @Override
+    public String getDatabaseName() {
+        return delegate.getDatabaseName();
+    }
+
+    @Override
+    public String getTableName() {
+        return delegate.getTableName();
+    }
+
+    @Override
+    public Long getTableId() {
+        return delegate.getTableId();
+    }
+
+    @Override
+    public List<ColumnDefinition> getColumns() {
+        return delegate.getColumns();
+    }
+
+    @Override
+    public ColumnDefinition getColumnByIndex(int i) {
+        return delegate.getColumnByIndex(i);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
new file mode 100644
index 0000000..adf96be
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+/**
+ * An event implementation corresponding to the beginning of a MySQL 
transaction (update rows, e.g.)
+ */
+public class BeginTransactionEventInfo extends BaseBinlogEventInfo {
+
+    public BeginTransactionEventInfo(Long timestamp, String binlogFilename, 
long binlogPosition) {
+        super(BEGIN_EVENT, timestamp, binlogFilename, binlogPosition);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
new file mode 100644
index 0000000..6398d4f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.apache.nifi.cdc.event.EventInfo;
+
+/**
+ * An interface for MYSQL binlog-specific events.
+ */
+public interface BinlogEventInfo extends EventInfo {
+
+    String BINLOG_FILENAME_KEY = "binlog.filename";
+    String BINLOG_POSITION_KEY = "binlog.position";
+
+    String getBinlogFilename();
+
+    Long getBinlogPosition();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
new file mode 100644
index 0000000..343cced
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * An event listener wrapper for MYSQL binlog events generated from the 
mysql-binlog-connector.
+ */
+public class BinlogEventListener implements BinaryLogClient.EventListener {
+
+    protected final AtomicBoolean stopNow = new AtomicBoolean(false);
+    private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
+
+    private final BlockingQueue<RawBinlogEvent> queue;
+    private final BinaryLogClient client;
+
+    public BinlogEventListener(BinaryLogClient client, 
BlockingQueue<RawBinlogEvent> q) {
+        this.client = client;
+        this.queue = q;
+    }
+
+    public void start() {
+        stopNow.set(false);
+    }
+
+    public void stop() {
+        stopNow.set(true);
+    }
+
+    @Override
+    public void onEvent(Event event) {
+        while (!stopNow.get()) {
+            RawBinlogEvent ep = new RawBinlogEvent(event, 
client.getBinlogFilename());
+            try {
+                if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC, 
TimeUnit.MILLISECONDS)) {
+                    return;
+                } else {
+                    throw new RuntimeException("Unable to add event to the 
queue");
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException("Interrupted while adding event to 
the queue");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java
new file mode 100644
index 0000000..50796e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * An listener wrapper for mysql-binlog-connector lifecycle events.
+ */
+public class BinlogLifecycleListener implements 
BinaryLogClient.LifecycleListener {
+
+    AtomicReference<BinaryLogClient> client = new AtomicReference<>(null);
+    AtomicReference<Exception> exception = new AtomicReference<>(null);
+
+    @Override
+    public void onConnect(BinaryLogClient binaryLogClient) {
+        client.set(binaryLogClient);
+        exception.set(null);
+    }
+
+    @Override
+    public void onCommunicationFailure(BinaryLogClient binaryLogClient, 
Exception e) {
+        client.set(binaryLogClient);
+        exception.set(e);
+    }
+
+    @Override
+    public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, 
Exception e) {
+        client.set(binaryLogClient);
+        exception.set(e);
+    }
+
+    @Override
+    public void onDisconnect(BinaryLogClient binaryLogClient) {
+        client.set(binaryLogClient);
+    }
+
+    public BinaryLogClient getClient() {
+        return client.get();
+    }
+
+    public Exception getException() {
+        return exception.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java
new file mode 100644
index 0000000..43a50ad
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.apache.nifi.cdc.event.TableEventInfo;
+
+/**
+ * A marker interface for those classes wishing to implement binlog-specific 
methods as well as table-generic methods (getDatabase, e.g.)
+ */
+public interface BinlogTableEventInfo extends BinlogEventInfo, TableEventInfo {
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
new file mode 100644
index 0000000..96a84d3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+
+/**
+ * An event implementation corresponding to the beginning of a MySQL 
transaction (update rows, e.g.)
+ */
+public class CommitTransactionEventInfo extends BaseBinlogEventInfo {
+
+    public CommitTransactionEventInfo(Long timestamp, String binlogFilename, 
long binlogPosition) {
+        super(COMMIT_EVENT, timestamp, binlogFilename, binlogPosition);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
new file mode 100644
index 0000000..3b538ec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+
+import java.io.Serializable;
+
+/**
+ * This class represents information about rows deleted from a MySQL table
+ */
+public class DeleteRowsEventInfo extends 
BaseBinlogRowEventInfo<Serializable[]> {
+
+    public DeleteRowsEventInfo(TableInfo tableInfo, Long timestamp, String 
binlogFilename, Long binlogPosition, DeleteRowsEventData data) {
+        super(tableInfo, DELETE_EVENT, timestamp, binlogFilename, 
binlogPosition, data.getIncludedColumns(), data.getRows());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
new file mode 100644
index 0000000..0dd7f12
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+
+import java.io.Serializable;
+
+/**
+ * This class represents information about rows written/added to a MySQL table
+ */
+public class InsertRowsEventInfo extends 
BaseBinlogRowEventInfo<Serializable[]> {
+
+    private WriteRowsEventData data;
+
+    public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String 
binlogFilename, Long binlogPosition, WriteRowsEventData data) {
+        super(tableInfo, WRITE_EVENT, timestamp, binlogFilename, 
binlogPosition, data.getIncludedColumns(), data.getRows());
+        this.data = data;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
new file mode 100644
index 0000000..313bfce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import java.io.Serializable;
+
+/**
+ * A utility class to provide MySQL- / binlog-specific constants and methods 
for processing events and data
+ */
+public class MySQLCDCUtils {
+
+    public static Object getWritableObject(Integer type, Serializable value) {
+        if (value == null) {
+            return null;
+        }
+        if (type == null) {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else if (value instanceof Number) {
+                return value;
+            }
+        } else if (value instanceof Number) {
+            return value;
+        } else {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else {
+                return value.toString();
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java
new file mode 100644
index 0000000..c3ccabf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import com.github.shyiko.mysql.binlog.event.Event;
+
+/**
+ * An object holder for raw binlog events
+ */
+public class RawBinlogEvent {
+
+    private Event event;
+    private String binlogFilename;
+
+    public RawBinlogEvent(Event event, String binlogFilename) {
+        this.event = event;
+        this.binlogFilename = binlogFilename;
+    }
+
+    public Event getEvent() {
+        return event;
+    }
+
+    public void setEvent(Event event) {
+        this.event = event;
+    }
+
+    public String getBinlogFilename() {
+        return binlogFilename;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
new file mode 100644
index 0000000..a385b11
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import org.apache.nifi.cdc.event.TableEventInfo;
+import org.apache.nifi.cdc.event.TableInfo;
+
+
+/**
+ * An event class corresponding to table schema changes (add/drop column, 
add/drop table, etc.)
+ */
+public class SchemaChangeEventInfo extends BaseBinlogTableEventInfo implements 
TableEventInfo {
+
+    private String query;
+
+    public SchemaChangeEventInfo(TableInfo tableInfo, Long timestamp, String 
binlogFilename, long binlogPosition, String query) {
+        super(tableInfo, SCHEMA_CHANGE, timestamp, binlogFilename, 
binlogPosition);
+        this.query = query;
+    }
+
+    public String getQuery() {
+        return query;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
new file mode 100644
index 0000000..2e20dc6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.Map;
+
+/**
+ * This class represents information about rows written/added to a MySQL table
+ */
+public class UpdateRowsEventInfo extends 
BaseBinlogRowEventInfo<Map.Entry<Serializable[], Serializable[]>> {
+
+    private BitSet includedColumnsBeforeUpdate;
+
+    public UpdateRowsEventInfo(TableInfo tableInfo, Long timestamp, String 
binlogFilename, Long binlogPosition, UpdateRowsEventData data) {
+        super(tableInfo, UPDATE_EVENT, timestamp, binlogFilename, 
binlogPosition, data.getIncludedColumns(), data.getRows());
+        includedColumnsBeforeUpdate = data.getIncludedColumnsBeforeUpdate();
+    }
+
+    public BitSet getIncludedColumnsBeforeUpdate() {
+        return includedColumnsBeforeUpdate;
+    }
+}

Reply via email to