This is an automated email from the ASF dual-hosted git repository.

nsabonyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 78fd7fadcd NIFI-11380: Refactor CaptureChangeMySQL with improvements
78fd7fadcd is described below

commit 78fd7fadcda5948fe8a7847d2c07f83373c63d1c
Author: Matthew Burgess <[email protected]>
AuthorDate: Mon Apr 3 16:14:35 2023 -0400

    NIFI-11380: Refactor CaptureChangeMySQL with improvements
    
    This closes #7116
    
    Signed-off-by: Nandor Soma Abonyi <[email protected]>
---
 .../java/org/apache/nifi/cdc/event/TableInfo.java  |  68 ---
 .../apache/nifi/cdc/event/TableInfoCacheKey.java   |  21 -
 .../nifi-cdc-mysql-processors/pom.xml              |   2 +-
 .../nifi/cdc/mysql/event/DataCaptureState.java     |  82 +++
 .../apache/nifi/cdc/mysql/event/MySQLCDCUtils.java |  47 --
 .../cdc/mysql/event/handler/BeginEventHandler.java |  50 ++
 .../mysql/event/handler/BinlogEventHandler.java    |  37 ++
 .../mysql/event/handler/CommitEventHandler.java    |  63 +++
 .../cdc/mysql/event/handler/DDLEventHandler.java   |  52 ++
 .../mysql/event/handler/DeleteEventHandler.java    |  48 ++
 .../mysql/event/handler/InsertEventHandler.java    |  47 ++
 .../mysql/event/handler/UpdateEventHandler.java    |  48 ++
 .../event/io/AbstractBinlogTableEventWriter.java   |  24 +
 .../nifi/cdc/mysql/event/io/DeleteRowsWriter.java  |   3 +-
 .../nifi/cdc/mysql/event/io/InsertRowsWriter.java  |   3 +-
 .../nifi/cdc/mysql/event/io/UpdateRowsWriter.java  |   5 +-
 .../cdc/mysql/processors/CaptureChangeMySQL.java   | 630 ++++++++++-----------
 .../mysql/processors/CaptureChangeMySQLTest.groovy | 209 +------
 .../TestInsertRowsWriter.java}                     |  19 +-
 19 files changed, 779 insertions(+), 679 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
index d59306b537..ada962a175 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
@@ -17,22 +17,14 @@
 package org.apache.nifi.cdc.event;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
-import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /**
  * A POJO for holding table information related to update events.
  */
 public class TableInfo {
 
-    final static String DB_TABLE_NAME_DELIMITER = "@!@";
-
     private String databaseName;
     private String tableName;
     private Long tableId;
@@ -92,64 +84,4 @@ public class TableInfo {
         result = 31 * result + (columns != null ? columns.hashCode() : 0);
         return result;
     }
-
-    public static class Serializer implements 
org.apache.nifi.distributed.cache.client.Serializer<TableInfo> {
-
-        @Override
-        public void serialize(TableInfo value, OutputStream output) throws 
SerializationException, IOException {
-            StringBuilder sb = new StringBuilder(value.getDatabaseName());
-            sb.append(DB_TABLE_NAME_DELIMITER);
-            sb.append(value.getTableName());
-            sb.append(DB_TABLE_NAME_DELIMITER);
-            sb.append(value.getTableId());
-            List<ColumnDefinition> columnDefinitions = value.getColumns();
-            if (columnDefinitions != null && !columnDefinitions.isEmpty()) {
-                sb.append(DB_TABLE_NAME_DELIMITER);
-                sb.append(columnDefinitions.stream().map((col) -> 
col.getName() + DB_TABLE_NAME_DELIMITER + 
col.getType()).collect(Collectors.joining(DB_TABLE_NAME_DELIMITER)));
-            }
-            output.write(sb.toString().getBytes());
-        }
-    }
-
-    public static class Deserializer implements 
org.apache.nifi.distributed.cache.client.Deserializer<TableInfo> {
-
-        @Override
-        public TableInfo deserialize(byte[] input) throws 
DeserializationException, IOException {
-            // Don't bother deserializing if empty, just return null. This 
usually happens when the key is not found in the cache
-            if (input == null || input.length == 0) {
-                return null;
-            }
-            String inputString = new String(input);
-            String[] tokens = inputString.split(DB_TABLE_NAME_DELIMITER);
-            int numTokens = tokens.length;
-            if (numTokens < 3) {
-                throw new IOException("Could not deserialize TableInfo from 
the following value: " + inputString);
-            }
-            String dbName = tokens[0];
-            String tableName = tokens[1];
-            Long tableId;
-            try {
-                tableId = Long.parseLong(tokens[2]);
-            } catch (NumberFormatException nfe) {
-                throw new IOException("Illegal table ID: " + tokens[2]);
-            }
-            // Parse column names and types
-            List<ColumnDefinition> columnDefinitions = new ArrayList<>();
-            for (int i = 0; i < numTokens - 3; i += 2) {
-                try {
-                    int columnTypeIndex = i + 4;
-                    int columnNameIndex = i + 3;
-                    if (columnTypeIndex < numTokens) {
-                        columnDefinitions.add(new 
ColumnDefinition(Integer.parseInt(tokens[columnTypeIndex]), 
tokens[columnNameIndex]));
-                    } else {
-                        throw new IOException("No type detected for column: " 
+ tokens[columnNameIndex]);
-                    }
-                } catch (NumberFormatException nfe) {
-                    throw new IOException("Illegal column type value for 
column " + (i / 2 + 1) + ": " + tokens[i + 4]);
-                }
-            }
-
-            return new TableInfo(dbName, tableName, tableId, 
columnDefinitions);
-        }
-    }
 }
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
index 2f185e9b7e..81392642bf 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
@@ -17,12 +17,6 @@
 package org.apache.nifi.cdc.event;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
-import 
org.apache.nifi.distributed.cache.client.exception.SerializationException;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import static org.apache.nifi.cdc.event.TableInfo.DB_TABLE_NAME_DELIMITER;
 
 /**
  * This class represents a key in a cache that contains information (column 
definitions, e.g.) for a database table
@@ -80,19 +74,4 @@ public class TableInfoCacheKey {
     public String getUuidPrefix() {
         return uuidPrefix;
     }
-
-    public static class Serializer implements 
org.apache.nifi.distributed.cache.client.Serializer<TableInfoCacheKey> {
-
-        @Override
-        public void serialize(TableInfoCacheKey key, OutputStream output) 
throws SerializationException, IOException {
-            StringBuilder sb = new StringBuilder(key.getUuidPrefix());
-            sb.append(DB_TABLE_NAME_DELIMITER);
-            sb.append(key.getDatabaseName());
-            sb.append(DB_TABLE_NAME_DELIMITER);
-            sb.append(key.getTableName());
-            sb.append(DB_TABLE_NAME_DELIMITER);
-            sb.append(key.getTableId());
-            output.write(sb.toString().getBytes());
-        }
-    }
 }
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
index 9dddc26a0c..d8cc4801e7 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
@@ -41,7 +41,7 @@ language governing permissions and limitations under the 
License. -->
         <dependency>
             <groupId>com.zendesk</groupId>
             <artifactId>mysql-binlog-connector-java</artifactId>
-            <version>0.27.6</version>
+            <version>0.28.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java
new file mode 100644
index 0000000000..08ad96c8c8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+public class DataCaptureState {
+
+    private String binlogFile = null;
+    private long binlogPosition = 4;
+
+    private boolean useGtid = false;
+    private String gtidSet = null;
+    private long sequenceId = 0L;
+
+    public DataCaptureState() {
+    }
+
+    public DataCaptureState(String binlogFile, long binlogPosition, boolean 
useGtid, String gtidSet, long sequenceId) {
+        this.binlogFile = binlogFile;
+        this.binlogPosition = binlogPosition;
+        this.useGtid = useGtid;
+        this.gtidSet = gtidSet;
+        this.sequenceId = sequenceId;
+    }
+
+    public String getBinlogFile() {
+        return binlogFile;
+    }
+
+    public void setBinlogFile(String binlogFile) {
+        this.binlogFile = binlogFile;
+    }
+
+    public long getBinlogPosition() {
+        return binlogPosition;
+    }
+
+    public void setBinlogPosition(long binlogPosition) {
+        this.binlogPosition = binlogPosition;
+    }
+
+    public boolean isUseGtid() {
+        return useGtid;
+    }
+
+    public void setUseGtid(boolean useGtid) {
+        this.useGtid = useGtid;
+    }
+
+    public String getGtidSet() {
+        return gtidSet;
+    }
+
+    public void setGtidSet(String gtidSet) {
+        this.gtidSet = gtidSet;
+    }
+
+    public long getSequenceId() {
+        return sequenceId;
+    }
+
+    public void setSequenceId(long sequenceId) {
+        this.sequenceId = sequenceId;
+    }
+
+    public DataCaptureState copy() {
+        return new DataCaptureState(binlogFile, binlogPosition, useGtid, 
gtidSet, sequenceId);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
deleted file mode 100644
index 313bfce763..0000000000
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cdc.mysql.event;
-
-import java.io.Serializable;
-
-/**
- * A utility class to provide MySQL- / binlog-specific constants and methods 
for processing events and data
- */
-public class MySQLCDCUtils {
-
-    public static Object getWritableObject(Integer type, Serializable value) {
-        if (value == null) {
-            return null;
-        }
-        if (type == null) {
-            if (value instanceof byte[]) {
-                return new String((byte[]) value);
-            } else if (value instanceof Number) {
-                return value;
-            }
-        } else if (value instanceof Number) {
-            return value;
-        } else {
-            if (value instanceof byte[]) {
-                return new String((byte[]) value);
-            } else {
-                return value.toString();
-            }
-        }
-        return null;
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java
new file mode 100644
index 0000000000..a58319d20f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class BeginEventHandler implements BinlogEventHandler<QueryEventData, 
BeginTransactionEventInfo> {
+
+    private final BeginTransactionEventWriter eventWriter = new 
BeginTransactionEventWriter();
+
+    @Override
+    public void handleEvent(final QueryEventData eventData, final boolean 
writeEvent, final DataCaptureState dataCaptureState,
+                            final CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, final EventWriterConfiguration 
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+        if (writeEvent) {
+            final String currentDatabase = eventData.getDatabase();
+            final BeginTransactionEventInfo beginEvent = 
dataCaptureState.isUseGtid()
+                    ? new BeginTransactionEventInfo(currentDatabase, 
timestamp, dataCaptureState.getGtidSet())
+                    : new BeginTransactionEventInfo(currentDatabase, 
timestamp, dataCaptureState.getBinlogFile(), 
dataCaptureState.getBinlogPosition());
+
+            binlogEventState.setCurrentEventInfo(beginEvent);
+            binlogEventState.setCurrentEventWriter(eventWriter);
+            dataCaptureState.setSequenceId(eventWriter.writeEvent(session, 
binlogResourceInfo.getTransitUri(), beginEvent, 
dataCaptureState.getSequenceId(),
+                    REL_SUCCESS, eventWriterConfiguration));
+        }
+        binlogResourceInfo.setInTransaction(true);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java
new file mode 100644
index 0000000000..44b8d505e7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface BinlogEventHandler<T extends EventData, S extends 
BinlogEventInfo> {
+
+    void handleEvent(final T eventData,
+                     final boolean writeEvent,
+                     final DataCaptureState dataCaptureState,
+                     final CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo,
+                     final CaptureChangeMySQL.BinlogEventState 
binlogEventState,
+                     final String sql,
+                     final EventWriterConfiguration eventWriterConfiguration,
+                     final ProcessSession session,
+                     final long timestamp);
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java
new file mode 100644
index 0000000000..14027204ab
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class CommitEventHandler implements BinlogEventHandler<EventData, 
CommitTransactionEventInfo> {
+
+    private final CommitTransactionEventWriter eventWriter = new 
CommitTransactionEventWriter();
+
+    @Override
+    public void handleEvent(final EventData eventData, final boolean 
writeEvent, final DataCaptureState dataCaptureState,
+                            final CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, final EventWriterConfiguration 
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+        final String currentDatabase = binlogResourceInfo.getCurrentDatabase();
+        final CommitTransactionEventInfo commitEvent = 
dataCaptureState.isUseGtid()
+                ? new CommitTransactionEventInfo(currentDatabase, timestamp, 
dataCaptureState.getGtidSet())
+                : new CommitTransactionEventInfo(currentDatabase, timestamp, 
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition());
+
+        if (writeEvent) {
+            binlogEventState.setCurrentEventInfo(commitEvent);
+            binlogEventState.setCurrentEventWriter(eventWriter);
+            dataCaptureState.setSequenceId(eventWriter.writeEvent(session, 
binlogResourceInfo.getTransitUri(), commitEvent, 
dataCaptureState.getSequenceId(),
+                    REL_SUCCESS, eventWriterConfiguration));
+        } else {
+            // If the COMMIT event is not to be written, the FlowFile should 
still be finished and the session committed.
+            if (session != null) {
+                FlowFile flowFile = 
eventWriterConfiguration.getCurrentFlowFile();
+                if (flowFile != null) {
+                    // Flush the events to the FlowFile when the processor is 
stopped
+                    eventWriter.finishAndTransferFlowFile(session, 
eventWriterConfiguration, binlogResourceInfo.getTransitUri(), 
dataCaptureState.getSequenceId(), commitEvent, REL_SUCCESS);
+                }
+                session.commitAsync();
+            }
+        }
+
+        // Update inTransaction value to state
+        binlogResourceInfo.setInTransaction(false);
+    }
+}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java
new file mode 100644
index 0000000000..698d7fe011
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DDLEventHandler implements BinlogEventHandler<QueryEventData, 
DDLEventInfo> {
+
+    private final DDLEventWriter eventWriter = new DDLEventWriter();
+
+    @Override
+    public void handleEvent(final QueryEventData eventData, final boolean 
writeEvent, final DataCaptureState dataCaptureState,
+                            final CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, final EventWriterConfiguration 
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+        if (writeEvent) {
+            final TableInfo ddlTableInfo = 
binlogResourceInfo.getCurrentTable() != null
+                    ? binlogResourceInfo.getCurrentTable()
+                    : new TableInfo(binlogResourceInfo.getCurrentDatabase(), 
null, null, null);
+            final DDLEventInfo ddlEvent = dataCaptureState.isUseGtid()
+                    ? new DDLEventInfo(ddlTableInfo, timestamp, 
dataCaptureState.getGtidSet(), sql)
+                    : new DDLEventInfo(ddlTableInfo, timestamp, 
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), sql);
+
+            binlogEventState.setCurrentEventInfo(ddlEvent);
+            binlogEventState.setCurrentEventWriter(eventWriter);
+            dataCaptureState.setSequenceId(eventWriter.writeEvent(session, 
binlogResourceInfo.getTransitUri(), ddlEvent, dataCaptureState.getSequenceId(),
+                    REL_SUCCESS, eventWriterConfiguration));
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java
new file mode 100644
index 0000000000..453a779aaf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DeleteEventHandler implements 
BinlogEventHandler<DeleteRowsEventData, DeleteRowsEventInfo> {
+
+    private final DeleteRowsWriter eventWriter = new DeleteRowsWriter();
+
+    @Override
+    public void handleEvent(final DeleteRowsEventData eventData, final boolean 
writeEvent, final DataCaptureState dataCaptureState,
+                            final CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, final EventWriterConfiguration 
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+        if (writeEvent) {
+            final DeleteRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+                    ? new 
DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getGtidSet(), eventData)
+                    : new 
DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), 
eventData);
+
+            binlogEventState.setCurrentEventInfo(eventInfo);
+            binlogEventState.setCurrentEventWriter(eventWriter);
+            dataCaptureState.setSequenceId(eventWriter.writeEvent(session, 
binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
+                    REL_SUCCESS, eventWriterConfiguration));
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java
new file mode 100644
index 0000000000..0feae1f277
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class InsertEventHandler implements 
BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> {
+
+    private final InsertRowsWriter eventWriter = new InsertRowsWriter();
+    @Override
+    public void handleEvent(final WriteRowsEventData eventData, final boolean 
writeEvent, final DataCaptureState dataCaptureState,
+                            final CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, final EventWriterConfiguration 
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+        if (writeEvent) {
+            final InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+                    ? new 
InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getGtidSet(), eventData)
+                    : new 
InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), 
eventData);
+
+            binlogEventState.setCurrentEventInfo(eventInfo);
+            binlogEventState.setCurrentEventWriter(eventWriter);
+            dataCaptureState.setSequenceId(eventWriter.writeEvent(session, 
binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
+                    REL_SUCCESS, eventWriterConfiguration));
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java
new file mode 100644
index 0000000000..2d538af06e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class UpdateEventHandler implements 
BinlogEventHandler<UpdateRowsEventData, UpdateRowsEventInfo> {
+
+    private final UpdateRowsWriter eventWriter = new UpdateRowsWriter();
+
+    @Override
+    public void handleEvent(final UpdateRowsEventData eventData, final boolean 
writeEvent, final DataCaptureState dataCaptureState,
+                            final CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, final EventWriterConfiguration 
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+        if (writeEvent) {
+            final UpdateRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+                    ? new 
UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getGtidSet(), eventData)
+                    : new 
UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), 
eventData);
+
+            binlogEventState.setCurrentEventInfo(eventInfo);
+            binlogEventState.setCurrentEventWriter(eventWriter);
+            dataCaptureState.setSequenceId(eventWriter.writeEvent(session, 
binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
+                    REL_SUCCESS, eventWriterConfiguration));
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
index f618a76a6f..013b63edf9 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
@@ -19,12 +19,36 @@ package org.apache.nifi.cdc.mysql.event.io;
 import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * An abstract base class for writing MYSQL table-related binlog events into 
flow file(s), e.g.
  */
 public abstract class AbstractBinlogTableEventWriter<T extends 
BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
 
+    protected Object getWritableObject(Integer type, Serializable value) {
+        if (value == null) {
+            return null;
+        }
+        if (type == null) {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else if (value instanceof Number) {
+                return value;
+            } else {
+                return null;
+            }
+        } else {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else if (value instanceof Number) {
+                return value;
+            } else {
+                return value.toString();
+            }
+        }
+    }
+
     protected void writeJson(T event) throws IOException {
         super.writeJson(event);
         if (event.getDatabaseName() != null) {
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
index 4828ce8662..8bf4b0220b 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.cdc.mysql.event.io;
 
 import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
-import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
@@ -89,7 +88,7 @@ public class DeleteRowsWriter extends 
AbstractBinlogTableEventWriter<DeleteRowsE
             if (row[i] == null) {
                 jsonGenerator.writeNullField("value");
             } else {
-                jsonGenerator.writeObjectField("value", 
MySQLCDCUtils.getWritableObject(columnType, row[i]));
+                jsonGenerator.writeObjectField("value", 
getWritableObject(columnType, row[i]));
             }
             jsonGenerator.writeEndObject();
             i = includedColumns.nextSetBit(i + 1);
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
index 4453889d53..b4fa31f787 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.cdc.mysql.event.io;
 
 import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
-import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
@@ -90,7 +89,7 @@ public class InsertRowsWriter extends 
AbstractBinlogTableEventWriter<InsertRowsE
             if (row[i] == null) {
                 jsonGenerator.writeNullField("value");
             } else {
-                jsonGenerator.writeObjectField("value", 
MySQLCDCUtils.getWritableObject(columnType, row[i]));
+                jsonGenerator.writeObjectField("value", 
getWritableObject(columnType, row[i]));
             }
             jsonGenerator.writeEndObject();
             i = includedColumns.nextSetBit(i + 1);
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
index 4a9a2d9785..c860ffdaab 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.cdc.mysql.event.io;
 
 import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
-import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
@@ -95,13 +94,13 @@ public class UpdateRowsWriter extends 
AbstractBinlogTableEventWriter<UpdateRowsE
             if (oldRow[i] == null) {
                 jsonGenerator.writeNullField("last_value");
             } else {
-                jsonGenerator.writeObjectField("last_value", 
MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));
+                jsonGenerator.writeObjectField("last_value", 
getWritableObject(columnType, oldRow[i]));
             }
 
             if (newRow[i] == null) {
                 jsonGenerator.writeNullField("value");
             } else {
-                jsonGenerator.writeObjectField("value", 
MySQLCDCUtils.getWritableObject(columnType, newRow[i]));
+                jsonGenerator.writeObjectField("value", 
getWritableObject(columnType, newRow[i]));
             }
             jsonGenerator.writeEndObject();
             i = includedColumns.nextSetBit(i + 1);
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 76ada9b306..d63ac8bc15 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -22,6 +22,7 @@ import com.github.shyiko.mysql.binlog.event.Event;
 import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
 import com.github.shyiko.mysql.binlog.event.EventType;
 import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.MySqlGtid;
 import com.github.shyiko.mysql.binlog.event.QueryEventData;
 import com.github.shyiko.mysql.binlog.event.RotateEventData;
 import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@@ -36,7 +37,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
@@ -45,25 +45,20 @@ import org.apache.nifi.cdc.event.ColumnDefinition;
 import org.apache.nifi.cdc.event.RowEventException;
 import org.apache.nifi.cdc.event.TableInfo;
 import org.apache.nifi.cdc.event.TableInfoCacheKey;
-import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
 import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
 import org.apache.nifi.cdc.mysql.event.BinlogEventListener;
 import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener;
-import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
-import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
-import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
 import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
 import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
-import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
 import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
-import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
 import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
-import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
-import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
+import org.apache.nifi.cdc.mysql.event.handler.BeginEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.CommitEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.DDLEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.DeleteEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.InsertEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.UpdateEventHandler;
 import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory;
 import org.apache.nifi.cdc.mysql.processors.ssl.ConnectionPropertiesProvider;
 import 
org.apache.nifi.cdc.mysql.processors.ssl.StandardConnectionPropertiesProvider;
@@ -77,9 +72,7 @@ import org.apache.nifi.components.resource.ResourceType;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -96,7 +89,6 @@ import org.apache.nifi.ssl.SSLContextService;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.sql.Connection;
 import java.sql.Driver;
@@ -120,8 +112,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Logger;
 import java.util.regex.Pattern;
 
@@ -166,6 +156,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
     // Random invalid constant used as an indicator to not set the binlog 
position on the client (thereby using the latest available)
     private static final int DO_NOT_SET = -1000;
 
+    private static final int DEFAULT_MYSQL_PORT = 3306;
+
     // A regular expression matching multiline comments, used when parsing DDL 
statements
     private static final Pattern MULTI_COMMENT_PATTERN = 
Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);
 
@@ -228,13 +220,14 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor HOSTS = new 
PropertyDescriptor.Builder()
             .name("capture-change-mysql-hosts")
-            .displayName("MySQL Hosts")
-            .description("A list of hostname/port entries corresponding to 
nodes in a MySQL cluster. The entries should be comma separated "
-                    + "using a colon such as host1:port,host2:port,....  For 
example mysql.myhost.com:3306. This processor will attempt to connect to "
+            .displayName("MySQL Nodes")
+            .description("A list of hostname (and optional port) entries 
corresponding to nodes in a MySQL cluster. The entries should be comma 
separated "
+                    + "using a colon (if the port is to be specified) such as 
host1:port,host2:port,....  For example mysql.myhost.com:3306. The port need 
not be specified, "
+                    + "when omitted the default MySQL port value of 3306 will 
be used. This processor will attempt to connect to "
                     + "the hosts in the list in order. If one node goes down 
and failover is enabled for the cluster, then the processor will connect "
-                    + "to the active node (assuming its host entry is 
specified in this property.  The default port for MySQL connections is 3306.")
+                    + "to the active node (assuming its node entry is 
specified in this property).")
             .required(true)
-            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
@@ -317,10 +310,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor DIST_CACHE_CLIENT = new 
PropertyDescriptor.Builder()
             .name("capture-change-mysql-dist-map-cache-client")
-            .displayName("Distributed Map Cache Client")
-            .description("Identifies a Distributed Map Cache Client controller 
service to be used for keeping information about the various table columns, 
datatypes, etc. "
-                    + "needed by the processor. If a client is not specified, 
the generated events will not include column type or name information (but they 
will include database "
-                    + "and table information.")
+            .displayName("Distributed Map Cache Client - unused")
+            .description("This is a legacy property that is no longer used to 
store table information, the processor will handle the table information 
(column names, types, etc.)")
             .identifiesControllerService(DistributedMapCacheClient.class)
             .required(false)
             .build();
@@ -464,58 +455,38 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     private static final List<PropertyDescriptor> propDescriptors;
 
-    private volatile ProcessSession currentSession;
-    private BinaryLogClient binlogClient;
-    private BinlogEventListener eventListener;
-    private BinlogLifecycleListener lifecycleListener;
-    private GtidSet gtidSet;
+    private volatile BinaryLogClient binlogClient;
+    private volatile BinlogEventListener eventListener;
+    private volatile BinlogLifecycleListener lifecycleListener;
+    private volatile GtidSet gtidSet;
 
     // Set queue capacity to avoid excessive memory consumption
     private final BlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>(1000);
-    private volatile String currentBinlogFile = null;
-    private volatile long currentBinlogPosition = 4;
-    private volatile String currentGtidSet = null;
-
-    // The following variables save the value of the binlog filename, 
position, (sequence id), and gtid at the beginning of a transaction. Used for 
rollback
-    private volatile String xactBinlogFile = null;
-    private volatile long xactBinlogPosition = 4;
-    private volatile long xactSequenceId = 0;
-    private volatile String xactGtidSet = null;
-
-    private volatile TableInfo currentTable = null;
-    private volatile String currentDatabase = null;
-    private volatile Pattern databaseNamePattern;
-    private volatile Pattern tableNamePattern;
-    private volatile boolean includeBeginCommit = false;
-    private volatile boolean includeDDLEvents = false;
-    private volatile boolean useGtid = false;
 
-    private volatile boolean inTransaction = false;
-    private volatile boolean skipTable = false;
-    private final AtomicBoolean hasRun = new AtomicBoolean(false);
+    private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new 
HashMap<>();
 
-    private int currentHost = 0;
-    private String transitUri = "<unknown>";
+    private volatile ProcessSession currentSession;
+    private DataCaptureState currentDataCaptureState = new DataCaptureState();
 
-    private final AtomicLong currentSequenceId = new AtomicLong(0);
+    private volatile BinlogResourceInfo binlogResourceInfo = new 
BinlogResourceInfo();
 
-    private volatile DistributedMapCacheClient cacheClient = null;
-    private final Serializer<TableInfoCacheKey> cacheKeySerializer = new 
TableInfoCacheKey.Serializer();
-    private final Serializer<TableInfo> cacheValueSerializer = new 
TableInfo.Serializer();
-    private final Deserializer<TableInfo> cacheValueDeserializer = new 
TableInfo.Deserializer();
+    private volatile Pattern databaseNamePattern;
+    private volatile Pattern tableNamePattern;
+    private volatile boolean skipTable = false;
+    private int currentHost = 0;
+    private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
 
-    private JDBCConnectionHolder jdbcConnectionHolder = null;
+    private final BinlogEventState binlogEventState = new BinlogEventState();
 
-    private final BeginTransactionEventWriter beginEventWriter = new 
BeginTransactionEventWriter();
-    private final CommitTransactionEventWriter commitEventWriter = new 
CommitTransactionEventWriter();
-    private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
-    private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
-    private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
-    private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
+    private final BeginEventHandler beginEventHandler = new 
BeginEventHandler();
+    private final CommitEventHandler commitEventHandler = new 
CommitEventHandler();
+    private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
+    private final InsertEventHandler insertEventHandler = new 
InsertEventHandler();
+    private final DeleteEventHandler deleteEventHandler = new 
DeleteEventHandler();
+    private final UpdateEventHandler updateEventHandler = new 
UpdateEventHandler();
 
     private volatile EventWriterConfiguration eventWriterConfiguration;
-    private volatile BinlogEventInfo currentEventInfo;
-    private AbstractBinlogEventWriter<? extends BinlogEventInfo> 
currentEventWriter;
+
 
     static {
 
@@ -609,6 +580,9 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             return;
         }
 
+        // Get inTransaction value from state
+        
binlogResourceInfo.setInTransaction("true".equals(stateMap.get("inTransaction")));
+
         // Build a event writer config object for the event writers to use
         final FlowFileEventWriteStrategy flowFileEventWriteStrategy = 
FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue());
         eventWriterConfiguration = new EventWriterConfiguration(
@@ -624,49 +598,52 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
         boolean getAllRecords = 
context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean();
 
-        includeBeginCommit = 
context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
-        includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
-        useGtid = context.getProperty(USE_BINLOG_GTID).asBoolean();
+        if (binlogResourceInfo == null) {
+            binlogResourceInfo = new BinlogResourceInfo();
+        }
+        
currentDataCaptureState.setUseGtid(context.getProperty(USE_BINLOG_GTID).asBoolean());
 
-        if (useGtid) {
+        if (currentDataCaptureState.isUseGtid()) {
             // Set current gtid to whatever is in State, falling back to the 
Retrieve All Records then Initial Gtid if no State variable is present
-            currentGtidSet = stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY);
-            if (currentGtidSet == null) {
+            
currentDataCaptureState.setGtidSet(stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY));
+            if (currentDataCaptureState.getGtidSet() == null) {
                 if (!getAllRecords && 
context.getProperty(INIT_BINLOG_GTID).isSet()) {
-                    currentGtidSet = 
context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue();
+                    
currentDataCaptureState.setGtidSet(context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue());
                 } else {
                     // If we're starting from the beginning of all binlogs, 
the binlog gtid must be the empty string (not null)
-                    currentGtidSet = "";
+                    currentDataCaptureState.setGtidSet("");
                 }
             }
-            currentBinlogFile = "";
-            currentBinlogPosition = DO_NOT_SET;
+            currentDataCaptureState.setBinlogFile("");
+            currentDataCaptureState.setBinlogPosition(DO_NOT_SET);
         } else {
             // Set current binlog filename to whatever is in State, falling 
back to the Retrieve All Records then Initial Binlog Filename if no State 
variable is present
-            currentBinlogFile = 
stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
+            final String currentBinlogFile = 
stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
             if (currentBinlogFile == null) {
                 if (!getAllRecords) {
                     if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
-                        currentBinlogFile = 
context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
+                        
currentDataCaptureState.setBinlogFile(context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue());
                     }
                 } else {
                     // If we're starting from the beginning of all binlogs, 
the binlog filename must be the empty string (not null)
-                    currentBinlogFile = "";
+                    currentDataCaptureState.setBinlogFile("");
                 }
+            } else {
+                currentDataCaptureState.setBinlogFile(currentBinlogFile);
             }
 
             // Set current binlog position to whatever is in State, falling 
back to the Retrieve All Records then Initial Binlog Filename if no State 
variable is present
-            String binlogPosition = 
stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
+            final String binlogPosition = 
stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
             if (binlogPosition != null) {
-                currentBinlogPosition = Long.valueOf(binlogPosition);
+                
currentDataCaptureState.setBinlogPosition(Long.parseLong(binlogPosition));
             } else if (!getAllRecords) {
                 if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
-                    currentBinlogPosition = 
context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
+                    
currentDataCaptureState.setBinlogPosition(context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong());
                 } else {
-                    currentBinlogPosition = DO_NOT_SET;
+                    currentDataCaptureState.setBinlogPosition(DO_NOT_SET);
                 }
             } else {
-                currentBinlogPosition = -1;
+                currentDataCaptureState.setBinlogPosition(-1);
             }
         }
 
@@ -676,22 +653,10 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             // Use Initial Sequence ID property if none is found in state
             PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID);
             if (seqIdProp.isSet()) {
-                
currentSequenceId.set(seqIdProp.evaluateAttributeExpressions().asInteger());
+                
currentDataCaptureState.setSequenceId(seqIdProp.evaluateAttributeExpressions().asInteger());
             }
         } else {
-            currentSequenceId.set(Long.parseLong(seqIdString));
-        }
-        //get inTransaction value from state
-        inTransaction = "true".equals(stateMap.get("inTransaction"));
-
-        // Get reference to Distributed Cache if one exists. If it does not, 
no enrichment (resolution of column names, e.g.) will be performed
-        boolean createEnrichmentConnection = false;
-        if (context.getProperty(DIST_CACHE_CLIENT).isSet()) {
-            cacheClient = 
context.getProperty(DIST_CACHE_CLIENT).asControllerService(DistributedMapCacheClient.class);
-            createEnrichmentConnection = true;
-        } else {
-            logger.warn("No Distributed Map Cache Client is specified, so no 
event enrichment (resolution of column names, e.g.) will be performed.");
-            cacheClient = null;
+            currentDataCaptureState.setSequenceId(Long.parseLong(seqIdString));
         }
 
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE)
@@ -717,7 +682,7 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
             Long serverId = 
context.getProperty(SERVER_ID).evaluateAttributeExpressions().asLong();
 
-            connect(hosts, username, password, serverId, 
createEnrichmentConnection, driverLocation, driverName, connectTimeout, 
sslContextService, sslMode);
+            connect(hosts, username, password, serverId, driverLocation, 
driverName, connectTimeout, sslContextService, sslMode);
         } catch (IOException | IllegalStateException e) {
             if (eventListener != null) {
                 eventListener.stop();
@@ -735,8 +700,6 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
     @Override
     public synchronized void onTrigger(ProcessContext context, 
ProcessSessionFactory sessionFactory) throws ProcessException {
 
-        // Indicate that this processor has executed at least once, so we know 
whether or not the state values are valid and should be updated
-        hasRun.set(true);
         ComponentLog log = getLogger();
 
         // Create a client if we don't have one
@@ -773,16 +736,12 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         }
 
         try {
-            outputEvents(currentSession, log);
+            outputEvents(currentSession, context, log);
         } catch (Exception eventException) {
-            getLogger().error("Exception during event processing at file={} 
pos={}", currentBinlogFile, currentBinlogPosition, eventException);
+            getLogger().error("Exception during event processing at file={} 
pos={}", currentDataCaptureState.getBinlogFile(), 
currentDataCaptureState.getBinlogPosition(), eventException);
             try {
                 // Perform some processor-level "rollback", then rollback the 
session
-                currentBinlogFile = xactBinlogFile == null ? "" : 
xactBinlogFile;
-                currentBinlogPosition = xactBinlogPosition;
-                currentSequenceId.set(xactSequenceId);
-                currentGtidSet = xactGtidSet;
-                inTransaction = false;
+                binlogResourceInfo.setInTransaction(false);
                 stop();
             } catch (Exception e) {
                 // Not much we can recover from here
@@ -795,16 +754,6 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         }
     }
 
-    @OnStopped
-    @OnShutdown
-    public void onStopped(ProcessContext context) {
-        try {
-            stop();
-        } catch (CDCException ioe) {
-            throw new ProcessException(ioe);
-        }
-    }
-
     /**
      * Get a list of hosts from a NiFi property, e.g.
      *
@@ -821,16 +770,19 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
         for (String item : hostsSplit) {
             String[] addresses = item.split(":");
-            if (addresses.length != 2) {
+            if (addresses.length > 2 || addresses.length == 0) {
                 throw new ArrayIndexOutOfBoundsException("Not in host:port 
format");
+            } else if (addresses.length > 1) {
+                hostsList.add(new InetSocketAddress(addresses[0].trim(), 
Integer.parseInt(addresses[1].trim())));
+            } else {
+                // Assume default port of 3306
+                hostsList.add(new InetSocketAddress(addresses[0].trim(), 
DEFAULT_MYSQL_PORT));
             }
-
-            hostsList.add(new InetSocketAddress(addresses[0].trim(), 
Integer.parseInt(addresses[1].trim())));
         }
         return hostsList;
     }
 
-    protected void connect(List<InetSocketAddress> hosts, String username, 
String password, Long serverId, boolean createEnrichmentConnection,
+    protected void connect(List<InetSocketAddress> hosts, String username, 
String password, Long serverId,
                            String driverLocation, String driverName, long 
connectTimeout,
                            final SSLContextService sslContextService, final 
SSLMode sslMode) throws IOException {
 
@@ -839,17 +791,15 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         InetSocketAddress connectedHost = null;
         Exception lastConnectException = new Exception("Unknown connection 
error");
 
-        if (createEnrichmentConnection) {
-            try {
-                // Ensure driverLocation and driverName are correct before 
establishing binlog connection
-                // to avoid failing after binlog messages are received.
-                // Actual JDBC connection is created after binlog client gets 
started, because we need
-                // the connect-able host same as the binlog client.
-                registerDriver(driverLocation, driverName);
-            } catch (InitializationException e) {
-                throw new RuntimeException("Failed to register JDBC driver. 
Ensure MySQL Driver Location(s)" +
-                        " and MySQL Driver Class Name are configured 
correctly. " + e, e);
-            }
+        try {
+            // Ensure driverLocation and driverName are correct before 
establishing binlog connection
+            // to avoid failing after binlog messages are received.
+            // Actual JDBC connection is created after binlog client gets 
started, because we need
+            // the connect-able host same as the binlog client.
+            registerDriver(driverLocation, driverName);
+        } catch (InitializationException e) {
+            throw new RuntimeException("Failed to register JDBC driver. Ensure 
MySQL Driver Location(s)" +
+                    " and MySQL Driver Class Name are configured correctly. " 
+ e, e);
         }
 
         while (connectedHost == null && connectionAttempts < numHosts) {
@@ -871,12 +821,12 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             }
             binlogClient.registerLifecycleListener(lifecycleListener);
 
-            binlogClient.setBinlogFilename(currentBinlogFile);
-            if (currentBinlogPosition != DO_NOT_SET) {
-                binlogClient.setBinlogPosition(currentBinlogPosition);
+            
binlogClient.setBinlogFilename(currentDataCaptureState.getBinlogFile());
+            if (currentDataCaptureState.getBinlogPosition() != DO_NOT_SET) {
+                
binlogClient.setBinlogPosition(currentDataCaptureState.getBinlogPosition());
             }
 
-            binlogClient.setGtidSet(currentGtidSet);
+            binlogClient.setGtidSet(currentDataCaptureState.getGtidSet());
             binlogClient.setGtidSetFallbackToPurged(true);
 
             if (serverId != null) {
@@ -895,12 +845,12 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                     connectTimeout = Long.MAX_VALUE;
                 }
                 binlogClient.connect(connectTimeout);
-                transitUri = "mysql://" + connectedHost.getHostString() + ":" 
+ connectedHost.getPort();
+                binlogResourceInfo.setTransitUri("mysql://" + 
connectedHost.getHostString() + ":" + connectedHost.getPort());
 
             } catch (IOException | TimeoutException te) {
                 // Try the next host
                 connectedHost = null;
-                transitUri = "<unknown>";
+                binlogResourceInfo.setTransitUri("<unknown>");
                 currentHost = (currentHost + 1) % numHosts;
                 connectionAttempts++;
                 lastConnectException = te;
@@ -918,36 +868,37 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             throw new IOException("Could not connect binlog client to any of 
the specified hosts due to: " + lastConnectException.getMessage(), 
lastConnectException);
         }
 
-        if (createEnrichmentConnection) {
-            final TlsConfiguration tlsConfiguration = sslContextService == 
null ? null : sslContextService.createTlsConfiguration();
-            final ConnectionPropertiesProvider connectionPropertiesProvider = 
new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration);
-            final Map<String, String> jdbcConnectionProperties = 
connectionPropertiesProvider.getConnectionProperties();
-            jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, 
username, password, jdbcConnectionProperties, connectTimeout);
-            try {
-                // Ensure connection can be created.
-                getJdbcConnection();
-            } catch (SQLException e) {
-                getLogger().error("Error creating binlog enrichment JDBC 
connection to any of the specified hosts", e);
-                if (eventListener != null) {
-                    eventListener.stop();
-                    if (binlogClient != null) {
-                        binlogClient.unregisterEventListener(eventListener);
-                    }
-                }
+        final TlsConfiguration tlsConfiguration = sslContextService == null ? 
null : sslContextService.createTlsConfiguration();
+        final ConnectionPropertiesProvider connectionPropertiesProvider = new 
StandardConnectionPropertiesProvider(sslMode, tlsConfiguration);
+        final Map<String, String> jdbcConnectionProperties = 
connectionPropertiesProvider.getConnectionProperties();
+        jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, 
username, password, jdbcConnectionProperties, connectTimeout);
+        try {
+            // Ensure connection can be created.
+            getJdbcConnection();
+        } catch (SQLException e) {
+            getLogger().error("Error creating binlog enrichment JDBC 
connection to any of the specified hosts", e);
+            if (eventListener != null) {
+                eventListener.stop();
                 if (binlogClient != null) {
-                    binlogClient.disconnect();
-                    binlogClient = null;
+                    binlogClient.unregisterEventListener(eventListener);
                 }
-                return;
             }
+            if (binlogClient != null) {
+                binlogClient.disconnect();
+                binlogClient = null;
+            }
+            return;
         }
 
         gtidSet = new GtidSet(binlogClient.getGtidSet());
     }
 
 
-    public void outputEvents(ProcessSession session, ComponentLog log) throws 
IOException {
+    public void outputEvents(ProcessSession session, ProcessContext context, 
ComponentLog log) throws IOException {
         RawBinlogEvent rawBinlogEvent;
+        DataCaptureState dataCaptureState = currentDataCaptureState.copy();
+        final boolean includeBeginCommit = 
context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+        final boolean includeDDLEvents = 
context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
 
         // Drain the queue
         while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) {
@@ -959,10 +910,10 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             // We always get ROTATE and FORMAT_DESCRIPTION messages no matter 
where we start (even from the end), and they won't have the correct "next 
position" value, so only
             // advance the position if it is not that type of event. ROTATE 
events don't generate output CDC events and have the current binlog position in 
a special field, which
             // is filled in during the ROTATE case
-            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && 
!useGtid) {
-                currentBinlogPosition = header.getPosition();
+            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && 
!currentDataCaptureState.isUseGtid()) {
+                dataCaptureState.setBinlogPosition(header.getPosition());
             }
-            log.debug("Message event, type={} pos={} file={}", eventType, 
currentBinlogPosition, currentBinlogFile);
+            log.debug("Message event, type={} pos={} file={}", eventType, 
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
             switch (eventType) {
                 case TABLE_MAP:
                     // This is sent to inform which table is about to be 
changed by subsequent events
@@ -974,139 +925,92 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
                     if (!skipTable) {
                         TableInfoCacheKey key = new 
TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), 
data.getTableId());
-                        if (cacheClient != null) {
+                        
binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
+                        if (binlogResourceInfo.getCurrentTable() == null) {
+                            // We don't have an entry for this table yet, so 
fetch the info from the database and populate the cache
                             try {
-                                currentTable = cacheClient.get(key, 
cacheKeySerializer, cacheValueDeserializer);
-                            } catch (ConnectException ce) {
-                                throw new IOException("Could not connect to 
Distributed Map Cache server to get table information", ce);
-                            }
-
-                            if (currentTable == null) {
-                                // We don't have an entry for this table yet, 
so fetch the info from the database and populate the cache
-                                try {
-                                    currentTable = loadTableInfo(key);
-                                    try {
-                                        cacheClient.put(key, currentTable, 
cacheKeySerializer, cacheValueSerializer);
-                                    } catch (ConnectException ce) {
-                                        throw new IOException("Could not 
connect to Distributed Map Cache server to put table information", ce);
-                                    }
-                                } catch (SQLException se) {
-                                    // Propagate the error up, so things like 
rollback and logging/bulletins can be handled
-                                    throw new IOException(se.getMessage(), se);
-                                }
+                                
binlogResourceInfo.setCurrentTable(loadTableInfo(key));
+                                tableInfoCache.put(key, 
binlogResourceInfo.getCurrentTable());
+                            } catch (SQLException se) {
+                                // Propagate the error up, so things like 
rollback and logging/bulletins can be handled
+                                throw new IOException(se.getMessage(), se);
                             }
-                        } else {
-                            // Populate a limited version of TableInfo without 
column information
-                            currentTable = new 
TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), 
Collections.emptyList());
                         }
                     } else {
-                        // Clear the current table, to force a reload next 
time we get a TABLE_MAP event we care about
-                        currentTable = null;
+                        // Clear the current table, to force reload next time 
we get a TABLE_MAP event we care about
+                        binlogResourceInfo.setCurrentTable(null);
                     }
                     break;
                 case QUERY:
                     QueryEventData queryEventData = event.getData();
-                    currentDatabase = queryEventData.getDatabase();
+                    
binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
 
                     String sql = queryEventData.getSql();
 
                     // Is this the start of a transaction?
                     if ("BEGIN".equals(sql)) {
                         // If we're already in a transaction, something bad 
happened, alert the user
-                        if (inTransaction) {
+                        if (binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("BEGIN event received at pos={} 
file={} while already processing a transaction. This could indicate that your 
binlog position is invalid "
-                                    + "or the event stream is out of sync or 
there was an issue with the processor state.", currentBinlogPosition, 
currentBinlogFile);
+                                    + "or the event stream is out of sync or 
there was an issue with the processor state.", 
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                         }
-                        // Mark the current binlog position and GTID in case 
we have to rollback the transaction (if the processor is stopped, e.g.)
-                        xactBinlogFile = currentBinlogFile;
-                        xactBinlogPosition = currentBinlogPosition;
-                        xactSequenceId = currentSequenceId.get();
-                        xactGtidSet = currentGtidSet;
-
-                        if (includeBeginCommit && (databaseNamePattern == null 
|| databaseNamePattern.matcher(currentDatabase).matches())) {
-                            BeginTransactionEventInfo beginEvent = useGtid
-                                    ? new 
BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                    : new 
BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                            currentEventInfo = beginEvent;
-                            currentEventWriter = beginEventWriter;
-                            
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, 
beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
 
+                        if (databaseNamePattern == null || 
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) 
{
+                            beginEventHandler.handleEvent(queryEventData, 
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+                                    binlogEventState, sql, 
eventWriterConfiguration, currentSession, timestamp);
                         }
-                        inTransaction = true;
-                        //update inTransaction value to state
-                        updateState(session);
+                        // Whether we skip this event or not, it's still the 
beginning of a transaction
+                        binlogResourceInfo.setInTransaction(true);
+
+                        // Update inTransaction value to state
+                        updateState(session, dataCaptureState);
                     } else if ("COMMIT".equals(sql)) {
-                        if (!inTransaction) {
+                        // InnoDB generates XID events for "commit", but 
MyISAM generates Query events with "COMMIT", so handle that here
+                        if (!binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("COMMIT event received at pos={} 
file={} while not processing a transaction (i.e. no corresponding BEGIN event). 
"
                                     + "This could indicate that your binlog 
position is invalid or the event stream is out of sync or there was an issue 
with the processor state "
-                                    + "or there was an issue with the 
processor state.", currentBinlogPosition, currentBinlogFile);
+                                    + "or there was an issue with the 
processor state.", dataCaptureState.getBinlogPosition(), 
dataCaptureState.getBinlogFile());
                         }
-                        // InnoDB generates XID events for "commit", but 
MyISAM generates Query events with "COMMIT", so handle that here
-                        if (includeBeginCommit) {
-                            if (databaseNamePattern == null || 
databaseNamePattern.matcher(currentDatabase).matches()) {
-                                CommitTransactionEventInfo 
commitTransactionEvent = useGtid
-                                        ? new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                        : new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                                currentEventInfo = commitTransactionEvent;
-                                currentEventWriter = commitEventWriter;
-                                
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, 
eventWriterConfiguration));
-                            }
-                        } else {
-                            // If the COMMIT event is not to be written, the 
FlowFile should still be finished and the session committed.
-                            if (currentSession != null) {
-                                FlowFile flowFile = 
eventWriterConfiguration.getCurrentFlowFile();
-                                if (flowFile != null && currentEventWriter != 
null) {
-                                    // Flush the events to the FlowFile when 
the processor is stopped
-                                    
currentEventWriter.finishAndTransferFlowFile(currentSession, 
eventWriterConfiguration, transitUri, currentSequenceId.get(), 
currentEventInfo, REL_SUCCESS);
-                                }
-                                currentSession.commitAsync();
-                            }
+                        if (databaseNamePattern == null || 
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) 
{
+                            commitEventHandler.handleEvent(queryEventData, 
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+                                    binlogEventState, sql, 
eventWriterConfiguration, currentSession, timestamp);
                         }
-
-                        //update inTransaction value to state
-                        inTransaction = false;
-                        updateState(session);
+                        // Whether we skip this event or not, it's the end of 
a transaction
+                        binlogResourceInfo.setInTransaction(false);
+                        updateState(session, dataCaptureState);
                         // If there is no FlowFile open, commit the session
                         if (eventWriterConfiguration.getCurrentFlowFile() == 
null) {
                             // Commit the NiFi session
                             session.commitAsync();
                         }
-                        currentTable = null;
+                        binlogResourceInfo.setCurrentTable(null);
+                        binlogResourceInfo.setCurrentDatabase(null);
                     } else {
                         // Check for DDL events (alter table, e.g.). Normalize 
the query to do string matching on the type of change
                         String normalizedQuery = normalizeQuery(sql);
 
-                        if (normalizedQuery.startsWith("alter table")
-                                || normalizedQuery.startsWith("alter ignore 
table")
-                                || normalizedQuery.startsWith("create table")
-                                || normalizedQuery.startsWith("truncate table")
-                                || normalizedQuery.startsWith("rename table")
-                                || normalizedQuery.startsWith("drop table")
-                                || normalizedQuery.startsWith("drop 
database")) {
-
-                            if (includeDDLEvents && (databaseNamePattern == 
null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                                // If we don't have table information, we can 
still use the database name
-                                TableInfo ddlTableInfo = (currentTable != 
null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
-                                DDLEventInfo ddlEvent = useGtid
-                                        ? new DDLEventInfo(ddlTableInfo, 
timestamp, currentGtidSet, sql)
-                                        : new DDLEventInfo(ddlTableInfo, 
timestamp, currentBinlogFile, currentBinlogPosition, sql);
-                                currentEventInfo = ddlEvent;
-                                currentEventWriter = ddlEventWriter;
-                                
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, 
ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
-                            }
-                            // Remove all the keys from the cache that this 
processor added
-                            if (cacheClient != null) {
-                                
cacheClient.removeByPattern(this.getIdentifier() + ".*");
+                        if (isQueryDDL(normalizedQuery)) {
+                            if (databaseNamePattern == null || 
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) 
{
+                                if (queryEventData.getDatabase() == null) {
+                                    
queryEventData.setDatabase(binlogResourceInfo.getCurrentDatabase());
+                                }
+                                ddlEventHandler.handleEvent(queryEventData, 
includeDDLEvents, currentDataCaptureState, binlogResourceInfo,
+                                        binlogEventState, sql, 
eventWriterConfiguration, currentSession, timestamp);
+
+                                // The altered table may not be the "active" 
table, so clear the cache to pick up changes
+                                tableInfoCache.clear();
                             }
+
                             // If not in a transaction, commit the session so 
the DDL event(s) will be transferred
-                            if (includeDDLEvents && !inTransaction) {
-                                updateState(session);
+                            if (includeDDLEvents && 
!binlogResourceInfo.isInTransaction()) {
+                                updateState(session, dataCaptureState);
                                 if 
(FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
                                     if (currentSession != null) {
                                         FlowFile flowFile = 
eventWriterConfiguration.getCurrentFlowFile();
-                                        if (flowFile != null && 
currentEventWriter != null) {
+                                        if (flowFile != null && 
binlogEventState.getCurrentEventWriter() != null) {
                                             // Flush the events to the 
FlowFile when the processor is stopped
-                                            
currentEventWriter.finishAndTransferFlowFile(currentSession, 
eventWriterConfiguration, transitUri, currentSequenceId.get(), 
currentEventInfo, REL_SUCCESS);
+                                            
binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(currentSession,
 eventWriterConfiguration, binlogResourceInfo.getTransitUri(),
+                                                    
dataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(), 
REL_SUCCESS);
                                         }
                                     }
                                 }
@@ -1120,41 +1024,26 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                     break;
 
                 case XID:
-                    if (!inTransaction) {
+                    if (!binlogResourceInfo.isInTransaction()) {
                         getLogger().debug("COMMIT (XID) event received at 
pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN 
event). "
-                                + "This could indicate that your binlog 
position is invalid or the event stream is out of sync or there was an issue 
with the processor state.",
-                                currentBinlogPosition, currentBinlogFile);
+                                        + "This could indicate that your 
binlog position is invalid or the event stream is out of sync or there was an 
issue with the processor state.",
+                                dataCaptureState.getBinlogPosition(), 
dataCaptureState.getBinlogFile());
                     }
-                    if (includeBeginCommit) {
-                        if (databaseNamePattern == null || 
databaseNamePattern.matcher(currentDatabase).matches()) {
-                            CommitTransactionEventInfo commitTransactionEvent 
= useGtid
-                                    ? new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                    : new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                            currentEventInfo = commitTransactionEvent;
-                            currentEventWriter = commitEventWriter;
-                            
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, 
eventWriterConfiguration));
-                        }
-                    } else {
-                        // If the COMMIT event is not to be written, the 
FlowFile should still be finished and the session committed.
-                        if (currentSession != null) {
-                            FlowFile flowFile = 
eventWriterConfiguration.getCurrentFlowFile();
-                            if (flowFile != null && currentEventWriter != 
null) {
-                                // Flush the events to the FlowFile when the 
processor is stopped
-                                
currentEventWriter.finishAndTransferFlowFile(currentSession, 
eventWriterConfiguration, transitUri, currentSequenceId.get(), 
currentEventInfo, REL_SUCCESS);
-                            }
-                        }
+                    if (databaseNamePattern == null || 
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) 
{
+                        commitEventHandler.handleEvent(event.getData(), 
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+                                binlogEventState, null, 
eventWriterConfiguration, currentSession, timestamp);
                     }
-                    // update inTransaction value and save next position
-                    // so when restart this processor,we will not read xid 
event again
-                    inTransaction = false;
-                    currentBinlogPosition = header.getNextPosition();
-                    updateState(session);
+                    // Whether we skip this event or not, it's the end of a 
transaction
+                    binlogResourceInfo.setInTransaction(false);
+                    
dataCaptureState.setBinlogPosition(header.getNextPosition());
+                    updateState(session, dataCaptureState);
                     // If there is no FlowFile open, commit the session
                     if (eventWriterConfiguration.getCurrentFlowFile() == null) 
{
+                        // Commit the NiFi session
                         session.commitAsync();
                     }
-                    currentTable = null;
-                    currentDatabase = null;
+                    binlogResourceInfo.setCurrentTable(null);
+                    binlogResourceInfo.setCurrentDatabase(null);
                     break;
 
                 case WRITE_ROWS:
@@ -1170,11 +1059,11 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                     if (skipTable) {
                         break;
                     }
-                    if (!inTransaction) {
+                    if (!binlogResourceInfo.isInTransaction()) {
                         // These events should only happen inside a 
transaction, warn the user otherwise
                         log.info("Event {} occurred outside of a transaction, 
which is unexpected.", eventType.name());
                     }
-                    if (currentTable == null && cacheClient != null) {
+                    if (binlogResourceInfo.getCurrentTable() == null) {
                         // No Table Map event was processed prior to this 
event, which should not happen, so throw an error
                         throw new RowEventException("No table information is 
available for this event, cannot process further.");
                     }
@@ -1183,52 +1072,42 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                             || eventType == EXT_WRITE_ROWS
                             || eventType == PRE_GA_WRITE_ROWS) {
 
-                        InsertRowsEventInfo eventInfo = useGtid
-                                ? new InsertRowsEventInfo(currentTable, 
timestamp, currentGtidSet, event.getData())
-                                : new InsertRowsEventInfo(currentTable, 
timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
-                        currentEventInfo = eventInfo;
-                        currentEventWriter = insertRowsWriter;
-                        
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, 
eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+                        insertEventHandler.handleEvent(event.getData(), true, 
currentDataCaptureState, binlogResourceInfo,
+                                binlogEventState, null, 
eventWriterConfiguration, currentSession, timestamp);
 
                     } else if (eventType == DELETE_ROWS
                             || eventType == EXT_DELETE_ROWS
                             || eventType == PRE_GA_DELETE_ROWS) {
 
-                        DeleteRowsEventInfo eventInfo = useGtid
-                                ? new DeleteRowsEventInfo(currentTable, 
timestamp, currentGtidSet, event.getData())
-                                : new DeleteRowsEventInfo(currentTable, 
timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
-                        currentEventInfo = eventInfo;
-                        currentEventWriter = deleteRowsWriter;
-                        
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, 
eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
-
+                        deleteEventHandler.handleEvent(event.getData(), true, 
currentDataCaptureState, binlogResourceInfo,
+                                binlogEventState, null, 
eventWriterConfiguration, currentSession, timestamp);
                     } else {
                         // Update event
-                        UpdateRowsEventInfo eventInfo = useGtid
-                                ? new UpdateRowsEventInfo(currentTable, 
timestamp, currentGtidSet, event.getData())
-                                : new UpdateRowsEventInfo(currentTable, 
timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
-                        currentEventInfo = eventInfo;
-                        currentEventWriter = updateRowsWriter;
-                        
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, 
eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+                        updateEventHandler.handleEvent(event.getData(), true, 
currentDataCaptureState, binlogResourceInfo,
+                                binlogEventState, null, 
eventWriterConfiguration, currentSession, timestamp);
                     }
                     break;
 
                 case ROTATE:
-                    if (!useGtid) {
+                    if (!currentDataCaptureState.isUseGtid()) {
                         // Update current binlog filename
                         RotateEventData rotateEventData = event.getData();
-                        currentBinlogFile = 
rotateEventData.getBinlogFilename();
-                        currentBinlogPosition = 
rotateEventData.getBinlogPosition();
+                        
dataCaptureState.setBinlogFile(rotateEventData.getBinlogFilename());
+                        
dataCaptureState.setBinlogPosition(rotateEventData.getBinlogPosition());
                     }
-                    updateState(session);
+                    updateState(session, dataCaptureState);
                     break;
 
                 case GTID:
-                    if (useGtid) {
+                    if (currentDataCaptureState.isUseGtid()) {
                         // Update current binlog gtid
                         GtidEventData gtidEventData = event.getData();
-                        gtidSet.add(gtidEventData.getGtid());
-                        currentGtidSet = gtidSet.toString();
-                        updateState(session);
+                        MySqlGtid mySqlGtid = gtidEventData.getMySqlGtid();
+                        if (mySqlGtid != null) {
+                            gtidSet.add(mySqlGtid.toString());
+                            dataCaptureState.setGtidSet(gtidSet.toString());
+                            updateState(session, dataCaptureState);
+                        }
                     }
                     break;
 
@@ -1239,12 +1118,22 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             // Advance the current binlog position. This way if no more events 
are received and the processor is stopped, it will resume after the event that 
was just processed.
             // We always get ROTATE and FORMAT_DESCRIPTION messages no matter 
where we start (even from the end), and they won't have the correct "next 
position" value, so only
             // advance the position if it is not that type of event.
-            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && 
!useGtid && eventType != XID) {
-                currentBinlogPosition = header.getNextPosition();
+            if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && 
!currentDataCaptureState.isUseGtid() && eventType != XID) {
+                dataCaptureState.setBinlogPosition(header.getNextPosition());
             }
         }
     }
 
+    private boolean isQueryDDL(String sql) {
+        return sql.startsWith("alter table")
+                || sql.startsWith("alter ignore table")
+                || sql.startsWith("create table")
+                || sql.startsWith("truncate table")
+                || sql.startsWith("rename table")
+                || sql.startsWith("drop table")
+                || sql.startsWith("drop database");
+    }
+
     protected void clearState() throws IOException {
         if (currentSession == null) {
             throw new IllegalStateException("No current session");
@@ -1263,7 +1152,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         return normalizedQuery;
     }
 
-    protected void stop() throws CDCException {
+    @OnStopped
+    public void stop() throws CDCException {
         try {
             if (eventListener != null) {
                 eventListener.stop();
@@ -1277,14 +1167,18 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
             if (currentSession != null) {
                 FlowFile flowFile = 
eventWriterConfiguration.getCurrentFlowFile();
-                if (flowFile != null && currentEventWriter != null) {
+                if (flowFile != null && 
binlogEventState.getCurrentEventWriter() != null) {
                     // Flush the events to the FlowFile when the processor is 
stopped
-                    
currentEventWriter.finishAndTransferFlowFile(currentSession, 
eventWriterConfiguration, transitUri, currentSequenceId.get(), 
currentEventInfo, REL_SUCCESS);
+                    
binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(
+                            currentSession,
+                            eventWriterConfiguration,
+                            binlogResourceInfo.getTransitUri(),
+                            currentDataCaptureState.getSequenceId(), 
binlogEventState.getCurrentEventInfo(), REL_SUCCESS);
                 }
                 currentSession.commitAsync();
             }
 
-            currentBinlogPosition = -1;
+            currentDataCaptureState.setBinlogPosition(-1);
         } catch (IOException e) {
             throw new CDCException("Error closing CDC connection", e);
         } finally {
@@ -1296,29 +1190,30 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         }
     }
 
-    private void updateState(ProcessSession session) throws IOException {
-        updateState(session, currentBinlogFile, currentBinlogPosition, 
currentSequenceId.get(), currentGtidSet, inTransaction);
+    private void updateState(ProcessSession session, DataCaptureState 
dataCaptureState) throws IOException {
+        updateState(session, dataCaptureState, 
binlogResourceInfo.isInTransaction());
     }
 
-    private void updateState(ProcessSession session, String binlogFile, long 
binlogPosition, long sequenceId, String gtidSet, boolean inTransaction) throws 
IOException {
+    private void updateState(ProcessSession session, DataCaptureState 
dataCaptureState, boolean inTransaction) throws IOException {
         // Update state with latest values
         final Map<String, String> newStateMap = new 
HashMap<>(session.getState(Scope.CLUSTER).toMap());
 
         // Save current binlog filename, position and GTID to the state map
-        if (binlogFile != null) {
-            newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile);
+        if (dataCaptureState.getBinlogFile() != null) {
+            newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, 
dataCaptureState.getBinlogFile());
         }
 
-        newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, 
Long.toString(binlogPosition));
-        newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+        newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, 
Long.toString(dataCaptureState.getBinlogPosition()));
+        newStateMap.put(SEQUENCE_ID_KEY, 
String.valueOf(dataCaptureState.getSequenceId()));
         //add inTransaction value into state
         newStateMap.put("inTransaction", inTransaction ? "true" : "false");
 
-        if (gtidSet != null) {
-            newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+        if (dataCaptureState.getGtidSet() != null) {
+            newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
dataCaptureState.getGtidSet());
         }
 
         session.setState(newStateMap, Scope.CLUSTER);
+        currentDataCaptureState = dataCaptureState;
     }
 
 
@@ -1391,8 +1286,13 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         private JDBCConnectionHolder(InetSocketAddress host, String username, 
String password, Map<String, String> customProperties, long 
connectionTimeoutMillis) {
             this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" 
+ host.getPort();
             connectionProps.putAll(customProperties);
-            connectionProps.put("user", username);
-            connectionProps.put("password", password);
+            if (username != null) {
+                connectionProps.put("user", username);
+                if (password != null) {
+                    connectionProps.put("password", password);
+                }
+            }
+
             this.connectionTimeoutMillis = connectionTimeoutMillis;
         }
 
@@ -1486,7 +1386,71 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         public Logger getParentLogger() throws SQLFeatureNotSupportedException 
{
             return driver.getParentLogger();
         }
+    }
+
+    public static class BinlogEventState {
+        private BinlogEventInfo currentEventInfo;
+        private AbstractBinlogEventWriter<? extends BinlogEventInfo> 
currentEventWriter;
+
+        public BinlogEventInfo getCurrentEventInfo() {
+            return currentEventInfo;
+        }
 
+        public void setCurrentEventInfo(BinlogEventInfo currentEventInfo) {
+            this.currentEventInfo = currentEventInfo;
+        }
+
+        public AbstractBinlogEventWriter<? extends BinlogEventInfo> 
getCurrentEventWriter() {
+            return currentEventWriter;
+        }
+
+        public void setCurrentEventWriter(AbstractBinlogEventWriter<? extends 
BinlogEventInfo> currentEventWriter) {
+            this.currentEventWriter = currentEventWriter;
+        }
     }
 
+    public static class BinlogResourceInfo {
+        private TableInfo currentTable = null;
+        private String currentDatabase = null;
+
+        private boolean inTransaction = false;
+
+        private String transitUri = "<unknown>";
+
+        public BinlogResourceInfo() {
+
+        }
+
+        public TableInfo getCurrentTable() {
+            return currentTable;
+        }
+
+        public void setCurrentTable(TableInfo currentTable) {
+            this.currentTable = currentTable;
+        }
+
+        public String getCurrentDatabase() {
+            return currentDatabase;
+        }
+
+        public void setCurrentDatabase(String currentDatabase) {
+            this.currentDatabase = currentDatabase;
+        }
+
+        public boolean isInTransaction() {
+            return inTransaction;
+        }
+
+        public void setInTransaction(boolean inTransaction) {
+            this.inTransaction = inTransaction;
+        }
+
+        public String getTransitUri() {
+            return transitUri;
+        }
+
+        public void setTransitUri(String transitUri) {
+            this.transitUri = transitUri;
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 83c3a66f69..20523cbc06 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -23,6 +23,7 @@ import com.github.shyiko.mysql.binlog.event.EventData
 import com.github.shyiko.mysql.binlog.event.EventHeaderV4
 import com.github.shyiko.mysql.binlog.event.EventType
 import com.github.shyiko.mysql.binlog.event.GtidEventData
+import com.github.shyiko.mysql.binlog.event.MySqlGtid
 import com.github.shyiko.mysql.binlog.event.QueryEventData
 import com.github.shyiko.mysql.binlog.event.RotateEventData
 import com.github.shyiko.mysql.binlog.event.TableMapEventData
@@ -30,7 +31,6 @@ import 
com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
 import com.github.shyiko.mysql.binlog.network.SSLMode
 import groovy.json.JsonSlurper
-import org.apache.commons.io.output.WriterOutputStream
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading
 import org.apache.nifi.cdc.event.ColumnDefinition
 import org.apache.nifi.cdc.event.TableInfo
@@ -40,22 +40,12 @@ import 
org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy
 import org.apache.nifi.cdc.mysql.MockBinlogClient
 import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
 import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
-import org.apache.nifi.components.PropertyDescriptor
 import org.apache.nifi.components.state.Scope
-import org.apache.nifi.controller.AbstractControllerService
-import org.apache.nifi.distributed.cache.client.Deserializer
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
-import 
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
-import org.apache.nifi.distributed.cache.client.Serializer
 import org.apache.nifi.flowfile.attributes.CoreAttributes
-import org.apache.nifi.logging.ComponentLog
 import org.apache.nifi.processor.exception.ProcessException
 import org.apache.nifi.provenance.ProvenanceEventType
 import org.apache.nifi.reporting.InitializationException
 import org.apache.nifi.ssl.SSLContextService
-import org.apache.nifi.state.MockStateManager
-import org.apache.nifi.util.MockComponentLog
-import org.apache.nifi.util.MockControllerServiceInitializationContext
 import org.apache.nifi.util.TestRunner
 import org.apache.nifi.util.TestRunners
 import org.junit.jupiter.api.BeforeEach
@@ -67,8 +57,6 @@ import java.sql.ResultSet
 import java.sql.SQLException
 import java.sql.Statement
 import java.util.concurrent.TimeoutException
-import java.util.regex.Matcher
-import java.util.regex.Pattern
 
 import static org.junit.jupiter.api.Assertions.assertEquals
 import static org.junit.jupiter.api.Assertions.assertNotNull
@@ -378,13 +366,6 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
         testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
         testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
-        def clientProperties = [:]
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
'localhost')
-        testRunner.addControllerService('client', cacheClient, 
clientProperties)
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
-        testRunner.enableControllerService(cacheClient)
-
 
         testRunner.run(1, false, true)
 
@@ -521,7 +502,7 @@ class CaptureChangeMySQLTest {
     @Test
     void testExcludeSchemaChanges() throws Exception {
         testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
DRIVER_LOCATION)
-        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Don't 
include port here, should default to 3306
         testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
         testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
         testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1')
@@ -530,13 +511,6 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
         testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
         testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'false')
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
-        def clientProperties = [:]
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
'localhost')
-        testRunner.addControllerService('client', cacheClient, 
clientProperties)
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
-        testRunner.enableControllerService(cacheClient)
-
 
         testRunner.run(1, false, true)
 
@@ -595,16 +569,10 @@ class CaptureChangeMySQLTest {
     @Test
     void testNoTableInformationAvailable() throws Exception {
         testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
DRIVER_LOCATION)
-        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Port 
should default to 3306
         testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
         testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
-        def clientProperties = [:]
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
'localhost')
-        testRunner.addControllerService('client', cacheClient, 
clientProperties)
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
-        testRunner.enableControllerService(cacheClient)
 
         testRunner.run(1, false, true)
 
@@ -1106,12 +1074,7 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
         testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
-        def clientProperties = [:]
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
'localhost')
-        testRunner.addControllerService('client', cacheClient, 
clientProperties)
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
-        testRunner.enableControllerService(cacheClient)
+
         testRunner.run(1, false, true)
 
         // ROTATE
@@ -1174,19 +1137,13 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
         testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
-        def clientProperties = [:]
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
'localhost')
-        testRunner.addControllerService('client', cacheClient, 
clientProperties)
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
-        testRunner.enableControllerService(cacheClient)
 
         testRunner.run(1, false, true)
 
         // GTID
         client.sendEvent(new Event(
                 [timestamp: new Date().time, eventType: EventType.GTID, 
nextPosition: 2] as EventHeaderV4,
-                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1'] as 
GtidEventData
+                [gtid: MySqlGtid.fromString( 
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')] as GtidEventData
         ))
 
         // BEGIN
@@ -1206,7 +1163,7 @@ class CaptureChangeMySQLTest {
         // Stop the processor and verify the state is set
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 
'', Scope.CLUSTER)
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
'6', Scope.CLUSTER)
-        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1', Scope.CLUSTER)
 
         ((CaptureChangeMySQL) testRunner.getProcessor()).clearState()
         testRunner.stateManager.clear(Scope.CLUSTER)
@@ -1218,7 +1175,7 @@ class CaptureChangeMySQLTest {
         // GTID
         client.sendEvent(new Event(
                 [timestamp: new Date().time, eventType: EventType.GTID, 
nextPosition: 8] as EventHeaderV4,
-                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2'] as 
GtidEventData
+                [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2'] as 
GtidEventData
         ))
 
         // BEGIN
@@ -1239,12 +1196,12 @@ class CaptureChangeMySQLTest {
 
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 
'', Scope.CLUSTER)
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
'12', Scope.CLUSTER)
-        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-2', Scope.CLUSTER)
 
         // GTID
         client.sendEvent(new Event(
                 [timestamp: new Date().time, eventType: EventType.GTID, 
nextPosition: 14] as EventHeaderV4,
-                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as 
GtidEventData
+                [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as 
GtidEventData
         ))
 
         // BEGIN
@@ -1263,7 +1220,7 @@ class CaptureChangeMySQLTest {
 
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 
'', Scope.CLUSTER)
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
'18', Scope.CLUSTER)
-        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER)
+        
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3', Scope.CLUSTER)
     }
 
     @Test
@@ -1340,12 +1297,12 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
         testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
-        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')
         testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
         testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 
'false')
         testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
         testRunner.getStateManager().setState([
-                ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2',
+                ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2',
                 ("${EventWriter.SEQUENCE_ID_KEY}".toString()): '1'
         ], Scope.CLUSTER)
 
@@ -1354,7 +1311,7 @@ class CaptureChangeMySQLTest {
         // GTID
         client.sendEvent(new Event(
                 [timestamp: new Date().time, eventType: EventType.GTID, 
nextPosition: 2] as EventHeaderV4,
-                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as 
GtidEventData
+                [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as 
GtidEventData
         ))
 
         // BEGIN
@@ -1375,7 +1332,7 @@ class CaptureChangeMySQLTest {
 
         assertEquals(2, resultFiles.size())
         assertEquals(
-                'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3',
+                'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3',
                 
resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
         )
     }
@@ -1388,7 +1345,7 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
         testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
-        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+        testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')
         testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 
'false')
         testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
 
@@ -1397,7 +1354,7 @@ class CaptureChangeMySQLTest {
         // GTID
         client.sendEvent(new Event(
                 [timestamp: new Date().time, eventType: EventType.GTID, 
nextPosition: 2] as EventHeaderV4,
-                [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as 
GtidEventData
+                [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as 
GtidEventData
         ))
 
         // BEGIN
@@ -1418,7 +1375,7 @@ class CaptureChangeMySQLTest {
 
         assertEquals(2, resultFiles.size())
         assertEquals(
-                'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1:3-3',
+                'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1:3-3',
                 
resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
         )
     }
@@ -1430,12 +1387,6 @@ class CaptureChangeMySQLTest {
         testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root")
         testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds")
         testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true")
-        final DistributedMapCacheClientImpl cacheClient = createCacheClient()
-        Map<String, String> clientProperties = new HashMap<>()
-        
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 
"localhost")
-        testRunner.addControllerService("client", cacheClient, 
clientProperties)
-        testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client")
-        testRunner.enableControllerService(cacheClient)
 
         testRunner.run(1, false, true)
         // COMMIT
@@ -1496,131 +1447,5 @@ class CaptureChangeMySQLTest {
             
when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet)
             return mockConnection
         }
-
-    }
-
-    static DistributedMapCacheClientImpl createCacheClient() throws 
InitializationException {
-
-        final DistributedMapCacheClientImpl client = new 
DistributedMapCacheClientImpl()
-        final ComponentLog logger = new MockComponentLog("client", client)
-        final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client", logger, new 
MockStateManager(client))
-
-        client.initialize(clientInitContext)
-
-        return client
-    }
-
-    static
-    final class DistributedMapCacheClientImpl extends 
AbstractControllerService implements DistributedMapCacheClient {
-
-        private Map<String, String> cacheMap = new HashMap<>()
-
-        @Override
-        void close() throws IOException {
-        }
-
-        @Override
-        void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
-        }
-
-        @Override
-        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-
-            return [DistributedMapCacheClientService.HOSTNAME,
-                    DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
-                    DistributedMapCacheClientService.PORT,
-                    DistributedMapCacheClientService.SSL_CONTEXT_SERVICE]
-        }
-
-        @Override
-        <K, V> boolean putIfAbsent(
-                final K key,
-                final V value,
-                final Serializer<K> keySerializer, final Serializer<V> 
valueSerializer) throws IOException {
-
-            StringWriter keyWriter = new StringWriter()
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
-            String keyString = keyWriter.toString()
-
-            if (cacheMap.containsKey(keyString)) return false
-
-            StringWriter valueWriter = new StringWriter()
-            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter))
-            return true
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        <K, V> V getAndPutIfAbsent(
-                final K key, final V value, final Serializer<K> keySerializer, 
final Serializer<V> valueSerializer,
-                final Deserializer<V> valueDeserializer) throws IOException {
-            StringWriter keyWriter = new StringWriter()
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
-            String keyString = keyWriter.toString()
-
-            if (cacheMap.containsKey(keyString)) return 
valueDeserializer.deserialize(cacheMap.get(keyString).bytes)
-
-            StringWriter valueWriter = new StringWriter()
-            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter))
-            return null
-        }
-
-        @Override
-        <K> boolean containsKey(final K key, final Serializer<K> 
keySerializer) throws IOException {
-            StringWriter keyWriter = new StringWriter()
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
-            String keyString = keyWriter.toString()
-
-            return cacheMap.containsKey(keyString)
-        }
-
-        @Override
-        <K, V> V get(
-                final K key,
-                final Serializer<K> keySerializer, final Deserializer<V> 
valueDeserializer) throws IOException {
-            StringWriter keyWriter = new StringWriter()
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
-            String keyString = keyWriter.toString()
-
-            return (cacheMap.containsKey(keyString)) ? 
valueDeserializer.deserialize(cacheMap.get(keyString).bytes) : null
-        }
-
-        @Override
-        <K> boolean remove(final K key, final Serializer<K> serializer) throws 
IOException {
-            StringWriter keyWriter = new StringWriter()
-            serializer.serialize(key, new WriterOutputStream(keyWriter))
-            String keyString = keyWriter.toString()
-
-            boolean removed = (cacheMap.containsKey(keyString))
-            cacheMap.remove(keyString)
-            return removed
-        }
-
-        @Override
-        long removeByPattern(String regex) throws IOException {
-            final List<String> removedRecords = new ArrayList<>()
-            Pattern p = Pattern.compile(regex)
-            for (String key : cacheMap.keySet()) {
-                // Key must be backed by something that can be converted into 
a String
-                Matcher m = p.matcher(key)
-                if (m.matches()) {
-                    removedRecords.add(cacheMap.get(key))
-                }
-            }
-            final long numRemoved = removedRecords.size()
-            removedRecords.each {cacheMap.remove(it)}
-            return numRemoved
-        }
-
-        @Override
-        <K, V> void put(
-                final K key,
-                final V value,
-                final Serializer<K> keySerializer, final Serializer<V> 
valueSerializer) throws IOException {
-            StringWriter keyWriter = new StringWriter()
-            keySerializer.serialize(key, new WriterOutputStream(keyWriter))
-            StringWriter valueWriter = new StringWriter()
-            valueSerializer.serialize(value, new 
WriterOutputStream(valueWriter))
-        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java
similarity index 65%
rename from 
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
rename to 
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java
index 73573a2bd5..a34565ddf6 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java
@@ -14,7 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.cdc.mysql.event;
+package org.apache.nifi.cdc.mysql.event.io;
+
 
 import org.junit.jupiter.api.Test;
 
@@ -23,17 +24,15 @@ import java.sql.Types;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
+class TestInsertRowsWriter {
 
-/**
- * Unit Tests for MySQLCDCUtils utility class
- */
-public class MySQLCDCUtilsTest {
     @Test
-    public void testGetWritableObject() throws Exception {
-        assertNull(MySQLCDCUtils.getWritableObject(null, null));
-        assertNull(MySQLCDCUtils.getWritableObject(Types.INTEGER, null));
-        assertEquals((byte) 1, MySQLCDCUtils.getWritableObject(Types.INTEGER, 
(byte) 1));
-        assertEquals("Hello", MySQLCDCUtils.getWritableObject(Types.VARCHAR, 
"Hello".getBytes()));
+    public void testGetWritableObject() {
+        InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+        assertNull(insertRowsWriter.getWritableObject(null, null));
+        assertNull(insertRowsWriter.getWritableObject(Types.INTEGER, null));
+        assertEquals((byte) 1, 
insertRowsWriter.getWritableObject(Types.INTEGER, (byte) 1));
+        assertEquals("Hello", 
insertRowsWriter.getWritableObject(Types.VARCHAR, "Hello".getBytes()));
     }
 
 }
\ No newline at end of file

Reply via email to