http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
new file mode 100644
index 0000000..df4424c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.cdc.event.io.AbstractEventWriter;
+import org.apache.nifi.processor.Relationship;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An abstract base class for writing MYSQL binlog events into flow file(s), 
e.g.
+ */
+public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> 
extends AbstractEventWriter<T> {
+
+    protected void writeJson(T event) throws IOException {
+        jsonGenerator.writeStringField("binlog_filename", 
event.getBinlogFilename());
+        jsonGenerator.writeNumberField("binlog_position", 
event.getBinlogPosition());
+    }
+
+    protected Map<String, String> getCommonAttributes(final long sequenceId, 
BinlogEventInfo eventInfo) {
+        return new HashMap<String, String>() {
+            {
+                put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
+                put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
+                put(BinlogEventInfo.BINLOG_FILENAME_KEY, 
eventInfo.getBinlogFilename());
+                put(BinlogEventInfo.BINLOG_POSITION_KEY, 
Long.toString(eventInfo.getBinlogPosition()));
+                put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+            }
+        };
+    }
+
+    // Default implementation for binlog events
+    @Override
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, (outputStream) -> {
+            super.startJson(outputStream, eventInfo);
+            writeJson(eventInfo);
+            // Nothing in the body
+            super.endJson();
+        });
+        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        session.transfer(flowFile, relationship);
+        session.getProvenanceReporter().receive(flowFile, transitUri);
+        return currentSequenceId + 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
new file mode 100644
index 0000000..a674386
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
+
+import java.io.IOException;
+
+/**
+ * An abstract base class for writing MYSQL table-related binlog events into 
flow file(s), e.g.
+ */
+public abstract class AbstractBinlogTableEventWriter<T extends 
BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
+
+    protected void writeJson(T event) throws IOException {
+        super.writeJson(event);
+        if (event.getDatabaseName() != null) {
+            jsonGenerator.writeStringField("database", 
event.getDatabaseName());
+        } else {
+            jsonGenerator.writeNullField("database");
+        }
+        if (event.getTableName() != null) {
+            jsonGenerator.writeStringField("table_name", event.getTableName());
+        } else {
+            jsonGenerator.writeNullField("table_name");
+        }
+        if (event.getTableId() != null) {
+            jsonGenerator.writeNumberField("table_id", event.getTableId());
+        } else {
+            jsonGenerator.writeNullField("table_id");
+        }
+    }
+
+    // Default implementation for table-related binlog events
+    @Override
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, (outputStream) -> {
+            super.startJson(outputStream, eventInfo);
+            writeJson(eventInfo);
+            // Nothing in the body
+            super.endJson();
+        });
+        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        session.transfer(flowFile, relationship);
+        session.getProvenanceReporter().receive(flowFile, transitUri);
+        return currentSequenceId + 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
new file mode 100644
index 0000000..b1e3511
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/BeginTransactionEventWriter.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
+
+/**
+ * A writer for events corresponding to the beginning of a MySQL transaction
+ */
+public class BeginTransactionEventWriter extends 
AbstractBinlogEventWriter<BeginTransactionEventInfo> {
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
new file mode 100644
index 0000000..58b77a9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+
+import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+
+/**
+ * A writer for events corresponding to the end (i.e. commit) of a MySQL 
transaction
+ */
+public class CommitTransactionEventWriter extends 
AbstractBinlogEventWriter<CommitTransactionEventInfo> {
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
new file mode 100644
index 0000000..7e316d4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.processor.Relationship;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A writer class to output MySQL binlog "delete rows" events to flow file(s).
+ */
+public class DeleteRowsWriter extends 
AbstractBinlogTableEventWriter<DeleteRowsEventInfo> {
+
+    /**
+     * Creates and transfers a new flow file whose contents are the 
JSON-serialized value of the specified event, and the sequence ID attribute set
+     *
+     * @param session   A reference to a ProcessSession from which the flow 
file(s) will be created and transferred
+     * @param eventInfo An event whose value will become the contents of the 
flow file
+     * @return The next available CDC sequence ID for use by the CDC processor
+     */
+    @Override
+    public long writeEvent(final ProcessSession session, String transitUri, 
final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship 
relationship) {
+        final AtomicLong seqId = new AtomicLong(currentSequenceId);
+        for (Serializable[] row : eventInfo.getRows()) {
+
+            FlowFile flowFile = session.create();
+            flowFile = session.write(flowFile, outputStream -> {
+
+                super.startJson(outputStream, eventInfo);
+                super.writeJson(eventInfo);
+
+                final BitSet bitSet = eventInfo.getIncludedColumns();
+                writeRow(eventInfo, row, bitSet);
+
+                super.endJson();
+            });
+
+            flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(seqId.get(), eventInfo));
+            session.transfer(flowFile, relationship);
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+            seqId.getAndIncrement();
+        }
+        return seqId.get();
+    }
+
+    protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, 
BitSet includedColumns) throws IOException {
+        jsonGenerator.writeArrayFieldStart("columns");
+        int i = includedColumns.nextSetBit(0);
+        while (i != -1) {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeNumberField("id", i + 1);
+            ColumnDefinition columnDefinition = event.getColumnByIndex(i);
+            Integer columnType = null;
+            if (columnDefinition != null) {
+                jsonGenerator.writeStringField("name", 
columnDefinition.getName());
+                columnType = columnDefinition.getType();
+                jsonGenerator.writeNumberField("column_type", columnType);
+            }
+            if (row[i] == null) {
+                jsonGenerator.writeNullField("value");
+            } else {
+                jsonGenerator.writeObjectField("value", 
MySQLCDCUtils.getWritableObject(columnType, row[i]));
+            }
+            jsonGenerator.writeEndObject();
+            i = includedColumns.nextSetBit(i + 1);
+        }
+        jsonGenerator.writeEndArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
new file mode 100644
index 0000000..6cdfbdd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.processor.Relationship;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A writer class to output MySQL binlog "write rows" (aka INSERT) events to 
flow file(s).
+ */
+public class InsertRowsWriter extends 
AbstractBinlogTableEventWriter<InsertRowsEventInfo> {
+
+    /**
+     * Creates and transfers a new flow file whose contents are the 
JSON-serialized value of the specified event, and the sequence ID attribute set
+     *
+     * @param session   A reference to a ProcessSession from which the flow 
file(s) will be created and transferred
+     * @param eventInfo An event whose value will become the contents of the 
flow file
+     * @return The next available CDC sequence ID for use by the CDC processor
+     */
+    @Override
+    public long writeEvent(final ProcessSession session, String transitUri, 
final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship 
relationship) {
+        final AtomicLong seqId = new AtomicLong(currentSequenceId);
+        for (Serializable[] row : eventInfo.getRows()) {
+
+            FlowFile flowFile = session.create();
+            flowFile = session.write(flowFile, outputStream -> {
+
+                super.startJson(outputStream, eventInfo);
+                super.writeJson(eventInfo);
+
+                final BitSet bitSet = eventInfo.getIncludedColumns();
+                writeRow(eventInfo, row, bitSet);
+
+                super.endJson();
+            });
+
+            flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(seqId.get(), eventInfo));
+            session.transfer(flowFile, relationship);
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+            seqId.getAndIncrement();
+        }
+        return seqId.get();
+    }
+
+    protected void writeRow(InsertRowsEventInfo event, Serializable[] row, 
BitSet includedColumns) throws IOException {
+        jsonGenerator.writeArrayFieldStart("columns");
+        int i = includedColumns.nextSetBit(0);
+        while (i != -1) {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeNumberField("id", i + 1);
+            ColumnDefinition columnDefinition = event.getColumnByIndex(i);
+            Integer columnType = null;
+            if (columnDefinition != null) {
+                jsonGenerator.writeStringField("name", 
columnDefinition.getName());
+                columnType = columnDefinition.getType();
+                jsonGenerator.writeNumberField("column_type", columnType);
+            }
+            if (row[i] == null) {
+                jsonGenerator.writeNullField("value");
+            } else {
+                jsonGenerator.writeObjectField("value", 
MySQLCDCUtils.getWritableObject(columnType, row[i]));
+            }
+            jsonGenerator.writeEndObject();
+            i = includedColumns.nextSetBit(i + 1);
+        }
+        jsonGenerator.writeEndArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
new file mode 100644
index 0000000..fe31c07
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo;
+
+/**
+ * A writer class to output MySQL binlog "schema change" (ALTER TABLE, e.g.) 
events to flow file(s).
+ */
+public class SchemaChangeEventWriter extends 
AbstractBinlogTableEventWriter<SchemaChangeEventInfo> {
+
+    @Override
+    public long writeEvent(ProcessSession session, String transitUri, 
SchemaChangeEventInfo eventInfo, long currentSequenceId, Relationship 
relationship) {
+        FlowFile flowFile = session.create();
+        flowFile = session.write(flowFile, (outputStream) -> {
+            super.startJson(outputStream, eventInfo);
+            super.writeJson(eventInfo);
+            jsonGenerator.writeStringField("query", eventInfo.getQuery());
+            super.endJson();
+        });
+        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        session.transfer(flowFile, relationship);
+        session.getProvenanceReporter().receive(flowFile, transitUri);
+        return currentSequenceId + 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
new file mode 100644
index 0000000..a4934fd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.processor.Relationship;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A writer class to output MySQL binlog "write rows" (aka INSERT) events to 
flow file(s).
+ */
+public class UpdateRowsWriter extends 
AbstractBinlogTableEventWriter<UpdateRowsEventInfo> {
+
+    /**
+     * Creates and transfers a new flow file whose contents are the 
JSON-serialized value of the specified event, and the sequence ID attribute set
+     *
+     * @param session   A reference to a ProcessSession from which the flow 
file(s) will be created and transferred
+     * @param eventInfo An event whose value will become the contents of the 
flow file
+     * @return The next available CDC sequence ID for use by the CDC processor
+     */
+    @Override
+    public long writeEvent(final ProcessSession session, String transitUri, 
final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship 
relationship) {
+        final AtomicLong seqId = new AtomicLong(currentSequenceId);
+        for (Map.Entry<Serializable[], Serializable[]> row : 
eventInfo.getRows()) {
+
+            FlowFile flowFile = session.create();
+            flowFile = session.write(flowFile, outputStream -> {
+
+                super.startJson(outputStream, eventInfo);
+                super.writeJson(eventInfo);
+
+                final BitSet bitSet = eventInfo.getIncludedColumns();
+                writeRow(eventInfo, row, bitSet);
+
+                super.endJson();
+            });
+
+            flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(seqId.get(), eventInfo));
+            session.transfer(flowFile, relationship);
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+            seqId.getAndIncrement();
+        }
+        return seqId.get();
+    }
+
+    protected void writeRow(UpdateRowsEventInfo event, 
Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws 
IOException {
+
+        jsonGenerator.writeArrayFieldStart("columns");
+        int i = includedColumns.nextSetBit(0);
+        while (i != -1) {
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeNumberField("id", i + 1);
+            ColumnDefinition columnDefinition = event.getColumnByIndex(i);
+            Integer columnType = null;
+            if (columnDefinition != null) {
+                jsonGenerator.writeStringField("name", 
columnDefinition.getName());
+                columnType = columnDefinition.getType();
+                jsonGenerator.writeNumberField("column_type", columnType);
+            }
+            Serializable[] oldRow = row.getKey();
+            Serializable[] newRow = row.getValue();
+
+            if (oldRow[i] == null) {
+                jsonGenerator.writeNullField("last_value");
+            } else {
+                jsonGenerator.writeObjectField("last_value", 
MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));
+            }
+
+            if (newRow[i] == null) {
+                jsonGenerator.writeNullField("value");
+            } else {
+                jsonGenerator.writeObjectField("value", 
MySQLCDCUtils.getWritableObject(columnType, newRow[i]));
+            }
+            jsonGenerator.writeEndObject();
+            i = includedColumns.nextSetBit(i + 1);
+        }
+        jsonGenerator.writeEndArray();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
new file mode 100644
index 0000000..f135a91
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -0,0 +1,1033 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.processors;
+
+import com.github.shyiko.mysql.binlog.BinaryLogClient;
+import com.github.shyiko.mysql.binlog.event.Event;
+import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
+import com.github.shyiko.mysql.binlog.event.EventType;
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import com.github.shyiko.mysql.binlog.event.RotateEventData;
+import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.cdc.CDCException;
+import org.apache.nifi.cdc.event.ColumnDefinition;
+import org.apache.nifi.cdc.event.RowEventException;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.TableInfoCacheKey;
+import org.apache.nifi.cdc.event.io.EventWriter;
+import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.cdc.mysql.event.BinlogEventListener;
+import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener;
+import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
+import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo;
+import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
+import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
+import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
+import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
+import org.apache.nifi.cdc.mysql.event.io.SchemaChangeEventWriter;
+import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+
+import static com.github.shyiko.mysql.binlog.event.EventType.DELETE_ROWS;
+import static com.github.shyiko.mysql.binlog.event.EventType.EXT_DELETE_ROWS;
+import static com.github.shyiko.mysql.binlog.event.EventType.EXT_WRITE_ROWS;
+import static 
com.github.shyiko.mysql.binlog.event.EventType.FORMAT_DESCRIPTION;
+import static 
com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_DELETE_ROWS;
+import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
+import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
+import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
+
+
+/**
+ * A processor to retrieve Change Data Capture (CDC) events and send them as 
flow files.
+ */
+@TriggerSerially
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"sql", "jdbc", "cdc", "mysql"})
+@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a 
MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
+        + "are output as individual flow files ordered by the time at which 
the operation occurred.")
+@Stateful(scopes = Scope.CLUSTER, description = "Information such as a 
'pointer' to the current CDC event in the database is stored by this processor, 
such "
+        + "that it can continue from the same location if restarted.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "cdc.sequence.id", description = "A 
sequence identifier (i.e. strictly increasing integer value) specifying the 
order "
+                + "of the CDC event flow file relative to the other event flow 
file(s)."),
+        @WritesAttribute(attribute = "cdc.event.type", description = "A string 
indicating the type of CDC event that occurred, including (but not limited to) "
+                + "'begin', 'write', 'update', 'delete', 'schema_change' and 
'commit'."),
+        @WritesAttribute(attribute = "mime.type", description = "The processor 
outputs flow file content in JSON format, and sets the mime.type attribute to "
+                + "application/json")
+})
+public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
+
+    // Random invalid constant used as an indicator to not set the binlog 
position on the client (thereby using the latest available)
+    private static final int DO_NOT_SET = -1000;
+
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Successfully created FlowFile from SQL query result 
set.")
+            .build();
+
+    protected static Set<Relationship> relationships;
+
+    // Properties
+    public static final PropertyDescriptor DATABASE_NAME_PATTERN = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-db-name-pattern")
+            .displayName("Database/Schema Name Pattern")
+            .description("A regular expression (regex) for matching databases 
or schemas (depending on your RDBMS' terminology) against the list of CDC 
events. The regex must match "
+                    + "the schema name as it is stored in the database. If the 
property is not set, the schema name will not be used to filter the CDC 
events.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME_PATTERN = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-name-pattern")
+            .displayName("Table Name Pattern")
+            .description("A regular expression (regex) for matching CDC events 
affecting matching tables. The regex must match the table name as it is stored 
in the database. "
+                    + "If the property is not set, no events will be filtered 
based on table name.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor CONNECT_TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-max-wait-time")
+            .displayName("Max Wait Time")
+            .description("The maximum amount of time allowed for a connection 
to be established, zero means there is effectively no limit.")
+            .defaultValue("30 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor HOSTS = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-hosts")
+            .displayName("MySQL Hosts")
+            .description("A list of hostname/port entries corresponding to 
nodes in a MySQL cluster. The entries should be comma separated "
+                    + "using a colon such as host1:port,host2:port,....  For 
example mysql.myhost.com:3306. This processor will attempt to connect to "
+                    + "the hosts in the list in order. If one node goes down 
and failover is enabled for the cluster, then the processor will connect "
+                    + "to the active node (assuming its host entry is 
specified in this property.  The default port for MySQL connections is 3306.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_NAME = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-driver-class")
+            .displayName("MySQL Driver Class Name")
+            .description("The class name of the MySQL database driver class")
+            .defaultValue("com.mysql.jdbc.Driver")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor DRIVER_LOCATION = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-driver-locations")
+            .displayName("MySQL Driver Location(s)")
+            .description("Comma-separated list of files/folders and/or URLs 
containing the MySQL driver JAR and its dependencies (if any). "
+                    + "For example 
'/var/tmp/mysql-connector-java-5.1.38-bin.jar'")
+            .defaultValue(null)
+            .required(false)
+            .addValidator(StandardValidators.createListValidator(true, true, 
StandardValidators.createURLorFileValidator()))
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor USERNAME = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-username")
+            .displayName("Username")
+            .description("Username to access the MySQL cluster")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor PASSWORD = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-password")
+            .displayName("Password")
+            .description("Password to access the MySQL cluster")
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor SERVER_ID = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-server-id")
+            .displayName("Server ID")
+            .description("The client connecting to the MySQL replication group 
is actually a simplified slave (server), and the Server ID value must be unique 
across the whole replication "
+                    + "group (i.e. different from any other Server ID being 
used by any master or slave). Thus, each instance of CaptureChangeMySQL must 
have a Server ID unique across "
+                    + "the replication group. If the Server ID is not 
specified, it defaults to 65535.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor DIST_CACHE_CLIENT = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-dist-map-cache-client")
+            .displayName("Distributed Map Cache Client")
+            .description("Identifies a Distributed Map Cache Client controller 
service to be used for keeping information about the various tables, columns, 
etc. "
+                    + "needed by the processor. If a client is not specified, 
the generated events will not include column type or name information.")
+            .identifiesControllerService(DistributedMapCacheClient.class)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor RETRIEVE_ALL_RECORDS = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-retrieve-all-records")
+            .displayName("Retrieve All Records")
+            .description("Specifies whether to get all available CDC events, 
regardless of the current binlog filename and/or position. If binlog filename 
and position values are present "
+                    + "in the processor's State, this property's value is 
ignored. This allows for 4 different configurations: 1) If binlog data is 
available in processor State, that is used "
+                    + "to determine the start location and the value of 
Retrieve All Records is ignored. 2) If no binlog data is in processor State, 
then Retrieve All Records set to true "
+                    + "means start at the beginning of the binlog history. 3) 
If no binlog data is in processor State and Initial Binlog Filename/Position 
are not set, then "
+                    + "Retrieve All Records set to false means start at the 
end of the binlog history. 4) If no binlog data is in processor State and 
Initial Binlog Filename/Position "
+                    + "are set, then Retrieve All Records set to false means 
start at the specified initial binlog file/position. "
+                    + "To reset the behavior, clear the processor state (refer 
to the State Management section of the processor's documentation).")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor STATE_UPDATE_INTERVAL = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-state-update-interval")
+            .displayName("State Update Interval")
+            .description("Indicates how often to update the processor's state 
with binlog file/position values. A value of zero means that state will only be 
updated when the processor is "
+                    + "stopped or shutdown. If at some point the processor 
state does not contain the desired binlog values, the last flow file emitted 
will contain the last observed values, "
+                    + "and the processor can be returned to that state by 
using the Initial Binlog File, Initial Binlog Position, and Initial Sequence ID 
properties.")
+            .defaultValue("0 seconds")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor INIT_SEQUENCE_ID = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-init-seq-id")
+            .displayName("Initial Sequence ID")
+            .description("Specifies an initial sequence identifier to use if 
this processor's State does not have a current "
+                    + "sequence identifier. If a sequence identifier is 
present in the processor's State, this property is ignored. Sequence 
identifiers are "
+                    + "monotonically increasing integers that record the order 
of flow files generated by the processor. They can be used with the 
EnforceOrder "
+                    + "processor to guarantee ordered delivery of CDC events.")
+            .required(false)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor INIT_BINLOG_FILENAME = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-init-binlog-filename")
+            .displayName("Initial Binlog Filename")
+            .description("Specifies an initial binlog filename to use if this 
processor's State does not have a current binlog filename. If a filename is 
present "
+                    + "in the processor's State, this property is ignored. 
This can be used along with Initial Binlog Position to \"skip ahead\" if 
previous events are not desired. "
+                    + "Note that NiFi Expression Language is supported, but 
this property is evaluated when the processor is configured, so FlowFile 
attributes may not be used. Expression "
+                    + "Language is supported to enable the use of the Variable 
Registry and/or environment properties.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor INIT_BINLOG_POSITION = new 
PropertyDescriptor.Builder()
+            .name("capture-change-mysql-init-binlog-position")
+            .displayName("Initial Binlog Position")
+            .description("Specifies an initial offset into a binlog (specified 
by Initial Binlog Filename) to use if this processor's State does not have a 
current "
+                    + "binlog filename. If a filename is present in the 
processor's State, this property is ignored. This can be used along with 
Initial Binlog Filename "
+                    + "to \"skip ahead\" if previous events are not desired. 
Note that NiFi Expression Language is supported, but this property is evaluated 
when the "
+                    + "processor is configured, so FlowFile attributes may not 
be used. Expression Language is supported to enable the use of the Variable 
Registry "
+                    + "and/or environment properties.")
+            .required(false)
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    private static List<PropertyDescriptor> propDescriptors;
+
+    private volatile ProcessSession currentSession;
+    private BinaryLogClient binlogClient;
+    private BinlogEventListener eventListener;
+    private BinlogLifecycleListener lifecycleListener;
+
+    private volatile LinkedBlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>();
+    private volatile String currentBinlogFile = null;
+    private volatile long currentBinlogPosition = 4;
+
+    // The following variables save the value of the binlog filename and 
position (and sequence id) at the beginning of a transaction. Used for rollback
+    private volatile String xactBinlogFile = null;
+    private volatile long xactBinlogPosition = 4;
+    private volatile long xactSequenceId = 0;
+
+    private volatile TableInfo currentTable = null;
+    private volatile Pattern databaseNamePattern;
+    private volatile Pattern tableNamePattern;
+
+    private volatile boolean inTransaction = false;
+    private volatile boolean skipTable = false;
+    private AtomicBoolean doStop = new AtomicBoolean(false);
+    private AtomicBoolean hasRun = new AtomicBoolean(false);
+
+    private int currentHost = 0;
+    private String transitUri = "<unknown>";
+
+    private volatile long lastStateUpdate = 0L;
+    private volatile long stateUpdateInterval = -1L;
+    private AtomicLong currentSequenceId = new AtomicLong(0);
+
+    private volatile DistributedMapCacheClient cacheClient = null;
+    private final Serializer<TableInfoCacheKey> cacheKeySerializer = new 
TableInfoCacheKey.Serializer();
+    private final Serializer<TableInfo> cacheValueSerializer = new 
TableInfo.Serializer();
+    private final Deserializer<TableInfo> cacheValueDeserializer = new 
TableInfo.Deserializer();
+
+    private Connection jdbcConnection = null;
+
+    private final BeginTransactionEventWriter beginEventWriter = new 
BeginTransactionEventWriter();
+    private final CommitTransactionEventWriter commitEventWriter = new 
CommitTransactionEventWriter();
+    private final SchemaChangeEventWriter schemaChangeEventWriter = new 
SchemaChangeEventWriter();
+    private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+    private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
+    private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
+
+    static {
+
+        final Set<Relationship> r = new HashSet<>();
+        r.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(r);
+
+        final List<PropertyDescriptor> pds = new ArrayList<>();
+        pds.add(HOSTS);
+        pds.add(DRIVER_NAME);
+        pds.add(DRIVER_LOCATION);
+        pds.add(USERNAME);
+        pds.add(PASSWORD);
+        pds.add(SERVER_ID);
+        pds.add(DATABASE_NAME_PATTERN);
+        pds.add(TABLE_NAME_PATTERN);
+        pds.add(CONNECT_TIMEOUT);
+        pds.add(DIST_CACHE_CLIENT);
+        pds.add(RETRIEVE_ALL_RECORDS);
+        pds.add(STATE_UPDATE_INTERVAL);
+        pds.add(INIT_SEQUENCE_ID);
+        pds.add(INIT_BINLOG_FILENAME);
+        pds.add(INIT_BINLOG_POSITION);
+        propDescriptors = Collections.unmodifiableList(pds);
+    }
+
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propDescriptors;
+    }
+
+    public void setup(ProcessContext context) {
+
+        final ComponentLog logger = getLogger();
+
+        final StateManager stateManager = context.getStateManager();
+        final StateMap stateMap;
+
+        try {
+            stateMap = stateManager.getState(Scope.CLUSTER);
+        } catch (final IOException ioe) {
+            logger.error("Failed to retrieve observed maximum values from the 
State Manager. Will not attempt "
+                    + "connection until this is accomplished.", ioe);
+            context.yield();
+            return;
+        }
+
+        PropertyValue dbNameValue = context.getProperty(DATABASE_NAME_PATTERN);
+        databaseNamePattern = dbNameValue.isSet() ? 
Pattern.compile(dbNameValue.getValue()) : null;
+
+        PropertyValue tableNameValue = context.getProperty(TABLE_NAME_PATTERN);
+        tableNamePattern = tableNameValue.isSet() ? 
Pattern.compile(tableNameValue.getValue()) : null;
+
+        stateUpdateInterval = 
context.getProperty(STATE_UPDATE_INTERVAL).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+
+        boolean getAllRecords = 
context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean();
+
+        // Set current binlog filename to whatever is in State, falling back 
to the Retrieve All Records then Initial Binlog Filename if no State variable 
is present
+        currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
+        if (currentBinlogFile == null) {
+            if (!getAllRecords) {
+                if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
+                    currentBinlogFile = 
context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
+                }
+            } else {
+                // If we're starting from the beginning of all binlogs, the 
binlog filename must be the empty string (not null)
+                currentBinlogFile = "";
+            }
+        }
+
+        // Set current binlog position to whatever is in State, falling back 
to the Retrieve All Records then Initial Binlog Filename if no State variable 
is present
+        String binlogPosition = 
stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
+        if (binlogPosition != null) {
+            currentBinlogPosition = Long.valueOf(binlogPosition);
+        } else if (!getAllRecords) {
+            if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
+                currentBinlogPosition = 
context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
+            } else {
+                currentBinlogPosition = DO_NOT_SET;
+            }
+        } else {
+            currentBinlogPosition = -1;
+        }
+
+        // Get current sequence ID from state
+        String seqIdString = stateMap.get(EventWriter.SEQUENCE_ID_KEY);
+        if (StringUtils.isEmpty(seqIdString)) {
+            // Use Initial Sequence ID property if none is found in state
+            PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID);
+            if (seqIdProp.isSet()) {
+                
currentSequenceId.set(seqIdProp.evaluateAttributeExpressions().asInteger());
+            }
+        } else {
+            currentSequenceId.set(Integer.parseInt(seqIdString));
+        }
+
+        // Get reference to Distributed Cache if one exists. If it does not, 
no enrichment (resolution of column names, e.g.) will be performed
+        boolean createEnrichmentConnection = false;
+        if (context.getProperty(DIST_CACHE_CLIENT).isSet()) {
+            cacheClient = 
context.getProperty(DIST_CACHE_CLIENT).asControllerService(DistributedMapCacheClient.class);
+            createEnrichmentConnection = true;
+        } else {
+            logger.warn("No Distributed Map Cache Client is specified, so no 
event enrichment (resolution of column names, e.g.) will be performed.");
+            cacheClient = null;
+        }
+
+
+        // Save off MySQL cluster and JDBC driver information, will be used to 
connect for event enrichment as well as for the binlog connector
+        try {
+            List<InetSocketAddress> hosts = 
getHosts(context.getProperty(HOSTS).evaluateAttributeExpressions().getValue());
+
+            String username = 
context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
+            String password = 
context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
+
+            // BinaryLogClient expects a non-null password, so set it to the 
empty string if it is not provided
+            if (password == null) {
+                password = "";
+            }
+
+            long connectTimeout = 
context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+
+            String driverLocation = 
context.getProperty(DRIVER_LOCATION).evaluateAttributeExpressions().getValue();
+            String driverName = 
context.getProperty(DRIVER_NAME).evaluateAttributeExpressions().getValue();
+
+            Long serverId = 
context.getProperty(SERVER_ID).evaluateAttributeExpressions().asLong();
+
+            connect(hosts, username, password, serverId, 
createEnrichmentConnection, driverLocation, driverName, connectTimeout);
+        } catch (IOException | IllegalStateException e) {
+            context.yield();
+            binlogClient = null;
+            throw new ProcessException(e.getMessage(), e);
+        }
+    }
+
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
+
+        // Indicate that this processor has executed at least once, so we know 
whether or not the state values are valid and should be updated
+        hasRun.set(true);
+        ComponentLog log = getLogger();
+        StateManager stateManager = context.getStateManager();
+
+        // Create a client if we don't have one
+        if (binlogClient == null) {
+            setup(context);
+        }
+
+        // If the client has been disconnected, try to reconnect
+        if (!binlogClient.isConnected()) {
+            Exception e = lifecycleListener.getException();
+            // If there's no exception, the listener callback might not have 
been executed yet, so try again later. Otherwise clean up and start over next 
time
+            if (e != null) {
+                // Communications failure, disconnect and try next time
+                log.error("Binlog connector communications failure: " + 
e.getMessage(), e);
+                try {
+                    stop(stateManager);
+                } catch (CDCException ioe) {
+                    throw new ProcessException(ioe);
+                }
+            }
+
+            // Try again later
+            context.yield();
+            return;
+        }
+
+        if (currentSession == null) {
+            currentSession = sessionFactory.createSession();
+        }
+
+        try {
+            outputEvents(currentSession, stateManager, log);
+            long now = System.currentTimeMillis();
+            long timeSinceLastUpdate = now - lastStateUpdate;
+
+            if (stateUpdateInterval != 0 && timeSinceLastUpdate >= 
stateUpdateInterval) {
+                updateState(stateManager, currentBinlogFile, 
currentBinlogPosition, currentSequenceId.get());
+                lastStateUpdate = now;
+            }
+        } catch (IOException ioe) {
+            try {
+                // Perform some processor-level "rollback", then rollback the 
session
+                currentBinlogFile = xactBinlogFile == null ? "" : 
xactBinlogFile;
+                currentBinlogPosition = xactBinlogPosition;
+                currentSequenceId.set(xactSequenceId);
+                inTransaction = false;
+                stop(stateManager);
+                queue.clear();
+                currentSession.rollback();
+            } catch (Exception e) {
+                // Not much we can recover from here
+                log.warn("Error occurred during rollback", e);
+            }
+            throw new ProcessException(ioe);
+        }
+    }
+
+    @OnStopped
+    public void onStopped(ProcessContext context) {
+        try {
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    @OnShutdown
+    public void onShutdown(ProcessContext context) {
+        try {
+            // In case we get shutdown while still running, save off the 
current state, disconnect, and shut down gracefully
+            stop(context.getStateManager());
+        } catch (CDCException ioe) {
+            throw new ProcessException(ioe);
+        }
+    }
+
+    /**
+     * Get a list of hosts from a NiFi property, e.g.
+     *
+     * @param hostsString A comma-separated list of hosts 
(host:port,host2:port2, etc.)
+     * @return List of InetSocketAddresses for the hosts
+     */
+    private List<InetSocketAddress> getHosts(String hostsString) {
+
+        if (hostsString == null) {
+            return null;
+        }
+        final List<String> hostsSplit = Arrays.asList(hostsString.split(","));
+        List<InetSocketAddress> hostsList = new ArrayList<>();
+
+        for (String item : hostsSplit) {
+            String[] addresses = item.split(":");
+            if (addresses.length != 2) {
+                throw new ArrayIndexOutOfBoundsException("Not in host:port 
format");
+            }
+
+            hostsList.add(new InetSocketAddress(addresses[0].trim(), 
Integer.parseInt(addresses[1].trim())));
+        }
+        return hostsList;
+    }
+
+    protected void connect(List<InetSocketAddress> hosts, String username, 
String password, Long serverId, boolean createEnrichmentConnection,
+                           String driverLocation, String driverName, long 
connectTimeout) throws IOException {
+
+        int connectionAttempts = 0;
+        final int numHosts = hosts.size();
+        InetSocketAddress connectedHost = null;
+
+        while (connectedHost == null && connectionAttempts < numHosts) {
+            if (binlogClient == null) {
+
+                connectedHost = hosts.get(currentHost);
+                binlogClient = 
createBinlogClient(connectedHost.getHostString(), connectedHost.getPort(), 
username, password);
+            }
+
+            // Add an event listener and lifecycle listener for binlog and 
client events, respectively
+            if (eventListener == null) {
+                eventListener = createBinlogEventListener(binlogClient, queue);
+            }
+            eventListener.start();
+            binlogClient.registerEventListener(eventListener);
+
+            if (lifecycleListener == null) {
+                lifecycleListener = createBinlogLifecycleListener();
+            }
+            binlogClient.registerLifecycleListener(lifecycleListener);
+
+            binlogClient.setBinlogFilename(currentBinlogFile);
+            if (currentBinlogPosition != DO_NOT_SET) {
+                binlogClient.setBinlogPosition(currentBinlogPosition);
+            }
+
+            if (serverId != null) {
+                binlogClient.setServerId(serverId);
+            }
+
+            try {
+                if (connectTimeout == 0) {
+                    connectTimeout = Long.MAX_VALUE;
+                }
+                binlogClient.connect(connectTimeout);
+                transitUri = "mysql://" + connectedHost.getHostString() + ":" 
+ connectedHost.getPort();
+
+            } catch (IOException | TimeoutException te) {
+                // Try the next host
+                connectedHost = null;
+                transitUri = "<unknown>";
+                currentHost = (currentHost + 1) % numHosts;
+                connectionAttempts++;
+            }
+        }
+        if (!binlogClient.isConnected()) {
+            binlogClient = null;
+            throw new IOException("Could not connect binlog client to any of 
the specified hosts");
+        }
+
+        if (createEnrichmentConnection) {
+            try {
+                jdbcConnection = getJdbcConnection(driverLocation, driverName, 
connectedHost, username, password, null);
+            } catch (InitializationException | SQLException e) {
+                throw new IOException("Error creating binlog enrichment JDBC 
connection to any of the specified hosts", e);
+            }
+        }
+
+        doStop.set(false);
+    }
+
+
+    public void outputEvents(ProcessSession session, StateManager 
stateManager, ComponentLog log) throws IOException {
+        RawBinlogEvent rawBinlogEvent;
+
+        // Drain the queue
+        while ((rawBinlogEvent = queue.poll()) != null && !doStop.get()) {
+            Event event = rawBinlogEvent.getEvent();
+            EventHeaderV4 header = event.getHeader();
+            long timestamp = header.getTimestamp();
+            EventType eventType = header.getEventType();
+            // Advance the current binlog position. This way if no more events 
are received and the processor is stopped, it will resume at the event about to 
be processed.
+            // We always get ROTATE and FORMAT_DESCRIPTION messages no matter 
where we start (even from the end), and they won't have the correct "next 
position" value, so only
+            // advance the position if it is not that type of event. ROTATE 
events don't generate output CDC events and have the current binlog position in 
a special field, which
+            // is filled in during the ROTATE case
+            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) {
+                currentBinlogPosition = header.getPosition();
+            }
+            log.debug("Got message event type: {} ", new 
Object[]{header.getEventType().toString()});
+            switch (eventType) {
+                case TABLE_MAP:
+                    // This is sent to inform which table is about to be 
changed by subsequent events
+                    TableMapEventData data = event.getData();
+
+                    // Should we skip this table? Yes if we've specified a DB 
or table name pattern and they don't match
+                    skipTable = (databaseNamePattern != null && 
!databaseNamePattern.matcher(data.getDatabase()).matches())
+                            || (tableNamePattern != null && 
!tableNamePattern.matcher(data.getTable()).matches());
+
+                    if (!skipTable) {
+                        TableInfoCacheKey key = new 
TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), 
data.getTableId());
+                        if (cacheClient != null) {
+                            try {
+                                currentTable = cacheClient.get(key, 
cacheKeySerializer, cacheValueDeserializer);
+                            } catch (ConnectException ce) {
+                                throw new IOException("Could not connect to 
Distributed Map Cache server to get table information", ce);
+                            }
+
+                            if (currentTable == null) {
+                                // We don't have an entry for this table yet, 
so fetch the info from the database and populate the cache
+                                try {
+                                    currentTable = loadTableInfo(key);
+                                    try {
+                                        cacheClient.put(key, currentTable, 
cacheKeySerializer, cacheValueSerializer);
+                                    } catch (ConnectException ce) {
+                                        throw new IOException("Could not 
connect to Distributed Map Cache server to put table information", ce);
+                                    }
+                                } catch (SQLException se) {
+                                    // Propagate the error up, so things like 
rollback and logging/bulletins can be handled
+                                    throw new IOException(se.getMessage(), se);
+                                }
+                            }
+                        }
+                    } else {
+                        // Clear the current table, to force a reload next 
time we get a TABLE_MAP event we care about
+                        currentTable = null;
+                    }
+                    break;
+                case QUERY:
+                    // Is this the start of a transaction?
+                    QueryEventData queryEventData = event.getData();
+                    String sql = queryEventData.getSql();
+                    if ("BEGIN".equals(sql)) {
+                        // If we're already in a transaction, something bad 
happened, alert the user
+                        if (inTransaction) {
+                            throw new IOException("BEGIN event received while 
already processing a transaction. This could indicate that your binlog position 
is invalid.");
+                        }
+                        // Mark the current binlog position in case we have to 
rollback the transaction (if the processor is stopped, e.g.)
+                        xactBinlogFile = currentBinlogFile;
+                        xactBinlogPosition = currentBinlogPosition;
+                        xactSequenceId = currentSequenceId.get();
+
+                        BeginTransactionEventInfo beginEvent = new 
BeginTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+                        
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, 
beginEvent, currentSequenceId.get(), REL_SUCCESS));
+                        inTransaction = true;
+                    } else if ("COMMIT".equals(sql)) {
+                        if (!inTransaction) {
+                            throw new IOException("COMMIT event received while 
not processing a transaction (i.e. no corresponding BEGIN event). "
+                                    + "This could indicate that your binlog 
position is invalid.");
+                        }
+                        // InnoDB generates XID events for "commit", but 
MyISAM generates Query events with "COMMIT", so handle that here
+                        CommitTransactionEventInfo commitTransactionEvent = 
new CommitTransactionEventInfo(timestamp, currentBinlogFile, 
currentBinlogPosition);
+                        
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+                        // Commit the NiFi session
+                        session.commit();
+                        inTransaction = false;
+                        currentTable = null;
+
+                    } else {
+                        // Check for schema change events (alter table, e.g.). 
Normalize the query to do string matching on the type of change
+                        String normalizedQuery = 
sql.toLowerCase().trim().replaceAll(" {2,}", " ");
+                        if (normalizedQuery.startsWith("alter table")
+                                || normalizedQuery.startsWith("alter ignore 
table")
+                                || normalizedQuery.startsWith("create table")
+                                || normalizedQuery.startsWith("drop table")
+                                || normalizedQuery.startsWith("drop 
database")) {
+                            SchemaChangeEventInfo schemaChangeEvent = new 
SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, 
currentBinlogPosition, normalizedQuery);
+                            
currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, 
transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS));
+                            // Remove all the keys from the cache that this 
processor added
+                            if (cacheClient != null) {
+                                
cacheClient.removeByPattern(this.getIdentifier() + ".*");
+                            }
+                        }
+                    }
+                    break;
+
+                case XID:
+                    if (!inTransaction) {
+                        throw new IOException("COMMIT event received while not 
processing a transaction (i.e. no corresponding BEGIN event). "
+                                + "This could indicate that your binlog 
position is invalid.");
+                    }
+                    CommitTransactionEventInfo commitTransactionEvent = new 
CommitTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
+                    
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+                    // Commit the NiFi session
+                    session.commit();
+                    inTransaction = false;
+                    currentTable = null;
+                    break;
+
+                case WRITE_ROWS:
+                case EXT_WRITE_ROWS:
+                case PRE_GA_WRITE_ROWS:
+                case UPDATE_ROWS:
+                case EXT_UPDATE_ROWS:
+                case PRE_GA_UPDATE_ROWS:
+                case DELETE_ROWS:
+                case EXT_DELETE_ROWS:
+                case PRE_GA_DELETE_ROWS:
+                    // If we are skipping this table, then don't emit any 
events related to its modification
+                    if (skipTable) {
+                        break;
+                    }
+                    if (!inTransaction) {
+                        // These events should only happen inside a 
transaction, warn the user otherwise
+                        log.warn("Table modification event occurred outside of 
a transaction.");
+                        break;
+                    }
+                    if (currentTable == null && cacheClient != null) {
+                        // No Table Map event was processed prior to this 
event, which should not happen, so throw an error
+                        throw new RowEventException("No table information is 
available for this event, cannot process further.");
+                    }
+
+                    if (eventType == WRITE_ROWS
+                            || eventType == EXT_WRITE_ROWS
+                            || eventType == PRE_GA_WRITE_ROWS) {
+
+                        InsertRowsEventInfo eventInfo = new 
InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, 
currentBinlogPosition, event.getData());
+                        
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, 
eventInfo, currentSequenceId.get(), REL_SUCCESS));
+
+                    } else if (eventType == DELETE_ROWS
+                            || eventType == EXT_DELETE_ROWS
+                            || eventType == PRE_GA_DELETE_ROWS) {
+
+                        DeleteRowsEventInfo eventInfo = new 
DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, 
currentBinlogPosition, event.getData());
+                        
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, 
eventInfo, currentSequenceId.get(), REL_SUCCESS));
+
+                    } else {
+                        // Update event
+                        UpdateRowsEventInfo eventInfo = new 
UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, 
currentBinlogPosition, event.getData());
+                        
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, 
eventInfo, currentSequenceId.get(), REL_SUCCESS));
+                    }
+                    break;
+
+                case ROTATE:
+                    // Update current binlog filename
+                    RotateEventData rotateEventData = event.getData();
+                    currentBinlogFile = rotateEventData.getBinlogFilename();
+                    currentBinlogPosition = 
rotateEventData.getBinlogPosition();
+                    break;
+                default:
+                    break;
+            }
+
+            // Advance the current binlog position. This way if no more events 
are received and the processor is stopped, it will resume after the event that 
was just processed.
+            // We always get ROTATE and FORMAT_DESCRIPTION messages no matter 
where we start (even from the end), and they won't have the correct "next 
position" value, so only
+            // advance the position if it is not that type of event.
+            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) {
+                currentBinlogPosition = header.getNextPosition();
+            }
+        }
+    }
+
+    protected void stop(StateManager stateManager) throws CDCException {
+        try {
+            if (binlogClient != null) {
+                binlogClient.disconnect();
+            }
+            if (eventListener != null) {
+                eventListener.stop();
+                if (binlogClient != null) {
+                    binlogClient.unregisterEventListener(eventListener);
+                }
+            }
+            doStop.set(true);
+
+            if (hasRun.getAndSet(false)) {
+                updateState(stateManager, currentBinlogFile, 
currentBinlogPosition, currentSequenceId.get());
+            }
+            currentBinlogPosition = -1;
+
+        } catch (IOException e) {
+            throw new CDCException("Error closing CDC connection", e);
+        } finally {
+            binlogClient = null;
+        }
+    }
+
+    private void updateState(StateManager stateManager, String binlogFile, 
long binlogPosition, long sequenceId) throws IOException {
+        // Update state with latest values
+        if (stateManager != null) {
+            Map<String, String> newStateMap = new 
HashMap<>(stateManager.getState(Scope.CLUSTER).toMap());
+
+            // Save current binlog filename and position to the state map
+            if (binlogFile != null) {
+                newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, 
binlogFile);
+            }
+            newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, 
Long.toString(binlogPosition));
+            newStateMap.put(EventWriter.SEQUENCE_ID_KEY, 
String.valueOf(sequenceId));
+            stateManager.setState(newStateMap, Scope.CLUSTER);
+        }
+    }
+
+
+    /**
+     * Creates and returns a BinlogEventListener instance, associated with the 
specified binlog client and event queue.
+     *
+     * @param client A reference to a BinaryLogClient. The listener is 
associated with the given client, such that the listener is notified when
+     *               events are available to the given client.
+     * @param q      A queue used to communicate events between the listener 
and the NiFi processor thread.
+     * @return A BinlogEventListener instance, which will be notified of 
events associated with the specified client
+     */
+    BinlogEventListener createBinlogEventListener(BinaryLogClient client, 
LinkedBlockingQueue<RawBinlogEvent> q) {
+        return new BinlogEventListener(client, q);
+    }
+
+    /**
+     * Creates and returns a BinlogLifecycleListener instance, associated with 
the specified binlog client and event queue.
+     *
+     * @return A BinlogLifecycleListener instance, which will be notified of 
events associated with the specified client
+     */
+    BinlogLifecycleListener createBinlogLifecycleListener() {
+        return new BinlogLifecycleListener();
+    }
+
+
+    BinaryLogClient createBinlogClient(String hostname, int port, String 
username, String password) {
+        return new BinaryLogClient(hostname, port, username, password);
+    }
+
+    /**
+     * Retrieves the column information for the specified database and table. 
The column information can be used to enrich CDC events coming from the RDBMS.
+     *
+     * @param key A TableInfoCacheKey reference, which contains the database 
and table names
+     * @return A TableInfo instance with the ColumnDefinitions provided (if 
retrieved successfully from the database)
+     */
+    protected TableInfo loadTableInfo(TableInfoCacheKey key) throws 
SQLException {
+        TableInfo tableInfo = null;
+        if (jdbcConnection != null) {
+            try (Statement s = jdbcConnection.createStatement()) {
+                s.execute("USE " + key.getDatabaseName());
+                ResultSet rs = s.executeQuery("SELECT * FROM " + 
key.getTableName() + " LIMIT 0");
+                ResultSetMetaData rsmd = rs.getMetaData();
+                int numCols = rsmd.getColumnCount();
+                List<ColumnDefinition> columnDefinitions = new ArrayList<>();
+                for (int i = 1; i <= numCols; i++) {
+                    // Use the column label if it exists, otherwise use the 
column name. We're not doing aliasing here, but it's better practice.
+                    String columnLabel = rsmd.getColumnLabel(i);
+                    columnDefinitions.add(new 
ColumnDefinition(rsmd.getColumnType(i), columnLabel != null ? columnLabel : 
rsmd.getColumnName(i)));
+                }
+
+                tableInfo = new TableInfo(key.getDatabaseName(), 
key.getTableName(), key.getTableId(), columnDefinitions);
+            }
+        }
+
+        return tableInfo;
+    }
+
+    /**
+     * using Thread.currentThread().getContextClassLoader(); will ensure that 
you are using the ClassLoader for you NAR.
+     *
+     * @throws InitializationException if there is a problem obtaining the 
ClassLoader
+     */
+    protected Connection getJdbcConnection(String locationString, String 
drvName, InetSocketAddress host, String username, String password, Map<String, 
String> customProperties)
+            throws InitializationException, SQLException {
+        if (locationString != null && locationString.length() > 0) {
+            try {
+                // Split and trim the entries
+                final ClassLoader classLoader = 
ClassLoaderUtils.getCustomClassLoader(
+                        locationString,
+                        this.getClass().getClassLoader(),
+                        (dir, name) -> name != null && name.endsWith(".jar")
+                );
+
+                // Workaround which allows to use URLClassLoader for JDBC 
driver loading.
+                // (Because the DriverManager will refuse to use a driver not 
loaded by the system ClassLoader.)
+                final Class<?> clazz = Class.forName(drvName, true, 
classLoader);
+                if (clazz == null) {
+                    throw new InitializationException("Can't load Database 
Driver " + drvName);
+                }
+                final Driver driver = (Driver) clazz.newInstance();
+                DriverManager.registerDriver(new DriverShim(driver));
+
+            } catch (final MalformedURLException e) {
+                throw new InitializationException("Invalid Database Driver Jar 
Url", e);
+            } catch (final Exception e) {
+                throw new InitializationException("Can't load Database 
Driver", e);
+            }
+        }
+        Properties connectionProps = new Properties();
+        if (customProperties != null) {
+            connectionProps.putAll(customProperties);
+        }
+        connectionProps.put("user", username);
+        connectionProps.put("password", password);
+
+        return DriverManager.getConnection("jdbc:mysql://" + 
host.getHostString() + ":" + host.getPort(), connectionProps);
+    }
+
+    private static class DriverShim implements Driver {
+        private Driver driver;
+
+        DriverShim(Driver d) {
+            this.driver = d;
+        }
+
+        @Override
+        public boolean acceptsURL(String u) throws SQLException {
+            return this.driver.acceptsURL(u);
+        }
+
+        @Override
+        public Connection connect(String u, Properties p) throws SQLException {
+            return this.driver.connect(u, p);
+        }
+
+        @Override
+        public int getMajorVersion() {
+            return this.driver.getMajorVersion();
+        }
+
+        @Override
+        public int getMinorVersion() {
+            return this.driver.getMinorVersion();
+        }
+
+        @Override
+        public DriverPropertyInfo[] getPropertyInfo(String u, Properties p) 
throws SQLException {
+            return this.driver.getPropertyInfo(u, p);
+        }
+
+        @Override
+        public boolean jdbcCompliant() {
+            return this.driver.jdbcCompliant();
+        }
+
+        @Override
+        public Logger getParentLogger() throws SQLFeatureNotSupportedException 
{
+            return driver.getParentLogger();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..ca68f64
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL

Reply via email to