nandorsoma commented on code in PR #7116:
URL: https://github.com/apache/nifi/pull/7116#discussion_r1176308032


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DDLEventHandler implements BinlogEventHandler<QueryEventData, 
DDLEventInfo> {
+    @Override
+    public void handleEvent(final QueryEventData eventData, final boolean 
writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, 
AbstractBinlogEventWriter<DDLEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, 
ProcessSession session, final long timestamp) {
+        TableInfo ddlTableInfo = (binlogResourceInfo.getCurrentTable() != null)

Review Comment:
   Parenthesis is not needed and it can be final. Also in other cases variables 
could be final. Could you fix them?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DDLEventHandler implements BinlogEventHandler<QueryEventData, 
DDLEventInfo> {
+    @Override
+    public void handleEvent(final QueryEventData eventData, final boolean 
writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, 
AbstractBinlogEventWriter<DDLEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, 
ProcessSession session, final long timestamp) {
+        TableInfo ddlTableInfo = (binlogResourceInfo.getCurrentTable() != null)
+                ? binlogResourceInfo.getCurrentTable()
+                : new TableInfo(binlogResourceInfo.getCurrentDatabase(), null, 
null, null);
+        DDLEventInfo ddlEvent = dataCaptureState.isUseGtid()

Review Comment:
   I think ddlTableInfo and ddlEvent creation could go into the `if 
(writeEvent)` block to eliminate unnecessary memory allocation. 



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DeleteEventHandler implements 
BinlogEventHandler<DeleteRowsEventData, DeleteRowsEventInfo> {
+    @Override
+    public void handleEvent(final DeleteRowsEventData eventData, final boolean 
writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, 
AbstractBinlogEventWriter<DeleteRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, 
ProcessSession session, final long timestamp) {
+        DeleteRowsEventInfo eventInfo = dataCaptureState.isUseGtid()

Review Comment:
   Same as at DDLEventHandler. We could move those lines into the `if 
(writeEvent)` block.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class UpdateEventHandler implements 
BinlogEventHandler<UpdateRowsEventData, UpdateRowsEventInfo> {
+    @Override
+    public void handleEvent(final UpdateRowsEventData eventData, final boolean 
writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, 
AbstractBinlogEventWriter<UpdateRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, 
ProcessSession session, final long timestamp) {
+        UpdateRowsEventInfo eventInfo = dataCaptureState.isUseGtid()

Review Comment:
   Same like in the other cases.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java:
##########
@@ -19,12 +19,35 @@
 import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
 
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * An abstract base class for writing MYSQL table-related binlog events into 
flow file(s), e.g.
  */
 public abstract class AbstractBinlogTableEventWriter<T extends 
BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
 
+    protected Object getWritableObject(Integer type, Serializable value) {
+        if (value == null) {
+            return null;
+        }
+        if (type == null) {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else if (value instanceof Number) {
+                return value;
+            }
+        } else if (value instanceof Number) {
+            return value;
+        } else {
+            if (value instanceof byte[]) {
+                return new String((byte[]) value);
+            } else {
+                return value.toString();
+            }
+        }
+        return null;
+    }

Review Comment:
   I think it is a bit easier to read this way:
   ```suggestion
       protected Object getWritableObject(Integer type, Serializable value) {
           if (value == null) {
               return null;
           }
   
           if (type == null) {
               if (value instanceof byte[]) {
                   return new String((byte[]) value);
               } else if (value instanceof Number) {
                   return value;
               } else {
                   return null;
               }
           } else {
               if (value instanceof byte[]) {
                   return new String((byte[]) value);
               } else if (value instanceof Number) {
                   return value;
               } else {
                   return value.toString();
               }
           }
       }
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class InsertEventHandler implements 
BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> {
+    @Override
+    public void handleEvent(final WriteRowsEventData eventData, final boolean 
writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, 
AbstractBinlogEventWriter<InsertRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, 
ProcessSession session, final long timestamp) {
+        InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid()

Review Comment:
   Same like in the other cases.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface BinlogEventHandler<T extends EventData, S extends 
BinlogEventInfo> {
+
+    void handleEvent(T eventData,
+                     final boolean writeEvent,

Review Comment:
   I'm not really in favor of using finals, but I think when we use them, we 
should use them consistently. It seems like other attributes can be final in 
this signature.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -228,13 +225,14 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor HOSTS = new 
PropertyDescriptor.Builder()
             .name("capture-change-mysql-hosts")
-            .displayName("MySQL Hosts")
-            .description("A list of hostname/port entries corresponding to 
nodes in a MySQL cluster. The entries should be comma separated "
-                    + "using a colon such as host1:port,host2:port,....  For 
example mysql.myhost.com:3306. This processor will attempt to connect to "
+            .displayName("MySQL Nodes")

Review Comment:
   In a separate PR we could rename this property on the main branch. What do 
you think?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class CommitEventHandler implements BinlogEventHandler<EventData, 
CommitTransactionEventInfo> {
+    @Override
+    public void handleEvent(final EventData eventData, final boolean 
writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, 
AbstractBinlogEventWriter<CommitTransactionEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, 
ProcessSession session, final long timestamp) {
+        final String currentDatabase = binlogResourceInfo.getCurrentDatabase();

Review Comment:
   As in the other cases it is enough to create currentDatabase and commitEvent 
in the `if (writeEvent)` block.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -317,10 +315,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     public static final PropertyDescriptor DIST_CACHE_CLIENT = new 
PropertyDescriptor.Builder()
             .name("capture-change-mysql-dist-map-cache-client")
-            .displayName("Distributed Map Cache Client")
-            .description("Identifies a Distributed Map Cache Client controller 
service to be used for keeping information about the various table columns, 
datatypes, etc. "
-                    + "needed by the processor. If a client is not specified, 
the generated events will not include column type or name information (but they 
will include database "
-                    + "and table information.")
+            .displayName("Distributed Map Cache Client - unused")

Review Comment:
   In a separate PR we could remove this property on the main branch. I also 
see that there is another deprecated property "State Update Interval". Maybe we 
could remove that also. What do you think?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     private static final List<PropertyDescriptor> propDescriptors;
 
-    private volatile ProcessSession currentSession;
-    private BinaryLogClient binlogClient;
-    private BinlogEventListener eventListener;
-    private BinlogLifecycleListener lifecycleListener;
-    private GtidSet gtidSet;
+    private volatile BinaryLogClient binlogClient;
+    private volatile BinlogEventListener eventListener;
+    private volatile BinlogLifecycleListener lifecycleListener;
+    private volatile GtidSet gtidSet;
 
     // Set queue capacity to avoid excessive memory consumption
     private final BlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>(1000);
-    private volatile String currentBinlogFile = null;
-    private volatile long currentBinlogPosition = 4;
-    private volatile String currentGtidSet = null;
-
-    // The following variables save the value of the binlog filename, 
position, (sequence id), and gtid at the beginning of a transaction. Used for 
rollback
-    private volatile String xactBinlogFile = null;
-    private volatile long xactBinlogPosition = 4;
-    private volatile long xactSequenceId = 0;
-    private volatile String xactGtidSet = null;
-
-    private volatile TableInfo currentTable = null;
-    private volatile String currentDatabase = null;
-    private volatile Pattern databaseNamePattern;
-    private volatile Pattern tableNamePattern;
-    private volatile boolean includeBeginCommit = false;
-    private volatile boolean includeDDLEvents = false;
-    private volatile boolean useGtid = false;
 
-    private volatile boolean inTransaction = false;
-    private volatile boolean skipTable = false;
-    private final AtomicBoolean hasRun = new AtomicBoolean(false);
+    private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new 
HashMap<>();
 
-    private int currentHost = 0;
-    private String transitUri = "<unknown>";
+    private volatile ProcessSession currentSession;
+    private DataCaptureState currentDataCaptureState = new DataCaptureState();
 
-    private final AtomicLong currentSequenceId = new AtomicLong(0);
+    private volatile BinlogResourceInfo binlogResourceInfo = new 
BinlogResourceInfo();

Review Comment:
   It's my personal taste, so feel free to ignore it, but I think it is better 
to initialize the objects when you actually fill then up with values. In this 
case in the setup method. Also when you stop the processor, you can clean it up 
by assigning null to it. This way it will use less memory when the processor is 
stopped. What do you think?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface BinlogEventHandler<T extends EventData, S extends 
BinlogEventInfo> {
+
+    void handleEvent(T eventData,
+                     final boolean writeEvent,
+                     DataCaptureState dataCaptureState,
+                     CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo,
+                     CaptureChangeMySQL.BinlogEventState binlogEventState,
+                     final String sql,
+                     AbstractBinlogEventWriter<S> eventWriter,

Review Comment:
   It seems like there is a 1:1 connection between EventHandlers and 
EventWriters. So InsertEventHandler will always use InsertEventWriter. Is there 
a specific reason then that the eventWriter is coming from "outside"? Am I 
missing something?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -974,139 +938,92 @@ public void outputEvents(ProcessSession session, 
ComponentLog log) throws IOExce
 
                     if (!skipTable) {
                         TableInfoCacheKey key = new 
TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), 
data.getTableId());
-                        if (cacheClient != null) {
+                        
binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
+                        if (binlogResourceInfo.getCurrentTable() == null) {
+                            // We don't have an entry for this table yet, so 
fetch the info from the database and populate the cache
                             try {
-                                currentTable = cacheClient.get(key, 
cacheKeySerializer, cacheValueDeserializer);
-                            } catch (ConnectException ce) {
-                                throw new IOException("Could not connect to 
Distributed Map Cache server to get table information", ce);
-                            }
-
-                            if (currentTable == null) {
-                                // We don't have an entry for this table yet, 
so fetch the info from the database and populate the cache
-                                try {
-                                    currentTable = loadTableInfo(key);
-                                    try {
-                                        cacheClient.put(key, currentTable, 
cacheKeySerializer, cacheValueSerializer);
-                                    } catch (ConnectException ce) {
-                                        throw new IOException("Could not 
connect to Distributed Map Cache server to put table information", ce);
-                                    }
-                                } catch (SQLException se) {
-                                    // Propagate the error up, so things like 
rollback and logging/bulletins can be handled
-                                    throw new IOException(se.getMessage(), se);
-                                }
+                                
binlogResourceInfo.setCurrentTable(loadTableInfo(key));
+                                tableInfoCache.put(key, 
binlogResourceInfo.getCurrentTable());
+                            } catch (SQLException se) {
+                                // Propagate the error up, so things like 
rollback and logging/bulletins can be handled
+                                throw new IOException(se.getMessage(), se);
                             }
-                        } else {
-                            // Populate a limited version of TableInfo without 
column information
-                            currentTable = new 
TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), 
Collections.emptyList());
                         }
                     } else {
-                        // Clear the current table, to force a reload next 
time we get a TABLE_MAP event we care about
-                        currentTable = null;
+                        // Clear the current table, to force reload next time 
we get a TABLE_MAP event we care about
+                        binlogResourceInfo.setCurrentTable(null);
                     }
                     break;
                 case QUERY:
                     QueryEventData queryEventData = event.getData();
-                    currentDatabase = queryEventData.getDatabase();
+                    
binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
 
                     String sql = queryEventData.getSql();
 
                     // Is this the start of a transaction?
                     if ("BEGIN".equals(sql)) {
                         // If we're already in a transaction, something bad 
happened, alert the user
-                        if (inTransaction) {
+                        if (binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("BEGIN event received at pos={} 
file={} while already processing a transaction. This could indicate that your 
binlog position is invalid "
-                                    + "or the event stream is out of sync or 
there was an issue with the processor state.", currentBinlogPosition, 
currentBinlogFile);
+                                    + "or the event stream is out of sync or 
there was an issue with the processor state.", 
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                         }
-                        // Mark the current binlog position and GTID in case 
we have to rollback the transaction (if the processor is stopped, e.g.)
-                        xactBinlogFile = currentBinlogFile;
-                        xactBinlogPosition = currentBinlogPosition;
-                        xactSequenceId = currentSequenceId.get();
-                        xactGtidSet = currentGtidSet;
-
-                        if (includeBeginCommit && (databaseNamePattern == null 
|| databaseNamePattern.matcher(currentDatabase).matches())) {
-                            BeginTransactionEventInfo beginEvent = useGtid
-                                    ? new 
BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                    : new 
BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                            currentEventInfo = beginEvent;
-                            currentEventWriter = beginEventWriter;
-                            
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, 
beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
 
+                        if (!(databaseNamePattern != null && 
!databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()))
 {

Review Comment:
   I think this can be simplified.
   ```suggestion
                           if ((databaseNamePattern == null || 
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()))
 {
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     private static final List<PropertyDescriptor> propDescriptors;
 
-    private volatile ProcessSession currentSession;
-    private BinaryLogClient binlogClient;
-    private BinlogEventListener eventListener;
-    private BinlogLifecycleListener lifecycleListener;
-    private GtidSet gtidSet;
+    private volatile BinaryLogClient binlogClient;
+    private volatile BinlogEventListener eventListener;
+    private volatile BinlogLifecycleListener lifecycleListener;
+    private volatile GtidSet gtidSet;
 
     // Set queue capacity to avoid excessive memory consumption
     private final BlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>(1000);
-    private volatile String currentBinlogFile = null;
-    private volatile long currentBinlogPosition = 4;
-    private volatile String currentGtidSet = null;
-
-    // The following variables save the value of the binlog filename, 
position, (sequence id), and gtid at the beginning of a transaction. Used for 
rollback
-    private volatile String xactBinlogFile = null;
-    private volatile long xactBinlogPosition = 4;
-    private volatile long xactSequenceId = 0;
-    private volatile String xactGtidSet = null;
-
-    private volatile TableInfo currentTable = null;
-    private volatile String currentDatabase = null;
-    private volatile Pattern databaseNamePattern;
-    private volatile Pattern tableNamePattern;
-    private volatile boolean includeBeginCommit = false;
-    private volatile boolean includeDDLEvents = false;
-    private volatile boolean useGtid = false;
 
-    private volatile boolean inTransaction = false;
-    private volatile boolean skipTable = false;
-    private final AtomicBoolean hasRun = new AtomicBoolean(false);
+    private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new 
HashMap<>();
 
-    private int currentHost = 0;
-    private String transitUri = "<unknown>";
+    private volatile ProcessSession currentSession;
+    private DataCaptureState currentDataCaptureState = new DataCaptureState();
 
-    private final AtomicLong currentSequenceId = new AtomicLong(0);
+    private volatile BinlogResourceInfo binlogResourceInfo = new 
BinlogResourceInfo();
 
-    private volatile DistributedMapCacheClient cacheClient = null;
-    private final Serializer<TableInfoCacheKey> cacheKeySerializer = new 
TableInfoCacheKey.Serializer();
-    private final Serializer<TableInfo> cacheValueSerializer = new 
TableInfo.Serializer();
-    private final Deserializer<TableInfo> cacheValueDeserializer = new 
TableInfo.Deserializer();
+    private volatile Pattern databaseNamePattern;
+    private volatile Pattern tableNamePattern;
+    private volatile boolean skipTable = false;
+    private int currentHost = 0;
+    private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
 
-    private JDBCConnectionHolder jdbcConnectionHolder = null;
+    private final BinlogEventState binlogEventState = new BinlogEventState();
 
     private final BeginTransactionEventWriter beginEventWriter = new 
BeginTransactionEventWriter();
+    private final BeginEventHandler beginEventHandler = new 
BeginEventHandler();
     private final CommitTransactionEventWriter commitEventWriter = new 
CommitTransactionEventWriter();
+    private final CommitEventHandler commitEventHandler = new 
CommitEventHandler();
     private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
+    private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
     private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+    private final InsertEventHandler insertEventHandler = new 
InsertEventHandler();

Review Comment:
   I'm wondering, is there an option to extract these handlers to a different 
class, that is able to retreive the required handler? This could greatly 
simplify the code, but I'm not sure that we can shrink the required parameters 
to a simple enough interface.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -974,139 +938,92 @@ public void outputEvents(ProcessSession session, 
ComponentLog log) throws IOExce
 
                     if (!skipTable) {
                         TableInfoCacheKey key = new 
TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), 
data.getTableId());
-                        if (cacheClient != null) {
+                        
binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
+                        if (binlogResourceInfo.getCurrentTable() == null) {
+                            // We don't have an entry for this table yet, so 
fetch the info from the database and populate the cache
                             try {
-                                currentTable = cacheClient.get(key, 
cacheKeySerializer, cacheValueDeserializer);
-                            } catch (ConnectException ce) {
-                                throw new IOException("Could not connect to 
Distributed Map Cache server to get table information", ce);
-                            }
-
-                            if (currentTable == null) {
-                                // We don't have an entry for this table yet, 
so fetch the info from the database and populate the cache
-                                try {
-                                    currentTable = loadTableInfo(key);
-                                    try {
-                                        cacheClient.put(key, currentTable, 
cacheKeySerializer, cacheValueSerializer);
-                                    } catch (ConnectException ce) {
-                                        throw new IOException("Could not 
connect to Distributed Map Cache server to put table information", ce);
-                                    }
-                                } catch (SQLException se) {
-                                    // Propagate the error up, so things like 
rollback and logging/bulletins can be handled
-                                    throw new IOException(se.getMessage(), se);
-                                }
+                                
binlogResourceInfo.setCurrentTable(loadTableInfo(key));
+                                tableInfoCache.put(key, 
binlogResourceInfo.getCurrentTable());
+                            } catch (SQLException se) {
+                                // Propagate the error up, so things like 
rollback and logging/bulletins can be handled
+                                throw new IOException(se.getMessage(), se);
                             }
-                        } else {
-                            // Populate a limited version of TableInfo without 
column information
-                            currentTable = new 
TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), 
Collections.emptyList());
                         }
                     } else {
-                        // Clear the current table, to force a reload next 
time we get a TABLE_MAP event we care about
-                        currentTable = null;
+                        // Clear the current table, to force reload next time 
we get a TABLE_MAP event we care about
+                        binlogResourceInfo.setCurrentTable(null);
                     }
                     break;
                 case QUERY:
                     QueryEventData queryEventData = event.getData();
-                    currentDatabase = queryEventData.getDatabase();
+                    
binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
 
                     String sql = queryEventData.getSql();
 
                     // Is this the start of a transaction?
                     if ("BEGIN".equals(sql)) {
                         // If we're already in a transaction, something bad 
happened, alert the user
-                        if (inTransaction) {
+                        if (binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("BEGIN event received at pos={} 
file={} while already processing a transaction. This could indicate that your 
binlog position is invalid "
-                                    + "or the event stream is out of sync or 
there was an issue with the processor state.", currentBinlogPosition, 
currentBinlogFile);
+                                    + "or the event stream is out of sync or 
there was an issue with the processor state.", 
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
                         }
-                        // Mark the current binlog position and GTID in case 
we have to rollback the transaction (if the processor is stopped, e.g.)
-                        xactBinlogFile = currentBinlogFile;
-                        xactBinlogPosition = currentBinlogPosition;
-                        xactSequenceId = currentSequenceId.get();
-                        xactGtidSet = currentGtidSet;
-
-                        if (includeBeginCommit && (databaseNamePattern == null 
|| databaseNamePattern.matcher(currentDatabase).matches())) {
-                            BeginTransactionEventInfo beginEvent = useGtid
-                                    ? new 
BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                    : new 
BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                            currentEventInfo = beginEvent;
-                            currentEventWriter = beginEventWriter;
-                            
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, 
beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
 
+                        if (!(databaseNamePattern != null && 
!databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()))
 {
+                            beginEventHandler.handleEvent(queryEventData, 
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+                                    binlogEventState, sql, beginEventWriter, 
eventWriterConfiguration, currentSession, timestamp);
                         }
-                        inTransaction = true;
-                        //update inTransaction value to state
-                        updateState(session);
+                        // Whether we skip this event or not, it's still the 
beginning of a transaction
+                        binlogResourceInfo.setInTransaction(true);
+
+                        // Update inTransaction value to state
+                        updateState(session, dataCaptureState);
                     } else if ("COMMIT".equals(sql)) {
-                        if (!inTransaction) {
+                        // InnoDB generates XID events for "commit", but 
MyISAM generates Query events with "COMMIT", so handle that here
+                        if (!binlogResourceInfo.isInTransaction()) {
                             getLogger().debug("COMMIT event received at pos={} 
file={} while not processing a transaction (i.e. no corresponding BEGIN event). 
"
                                     + "This could indicate that your binlog 
position is invalid or the event stream is out of sync or there was an issue 
with the processor state "
-                                    + "or there was an issue with the 
processor state.", currentBinlogPosition, currentBinlogFile);
+                                    + "or there was an issue with the 
processor state.", dataCaptureState.getBinlogPosition(), 
dataCaptureState.getBinlogFile());
                         }
-                        // InnoDB generates XID events for "commit", but 
MyISAM generates Query events with "COMMIT", so handle that here
-                        if (includeBeginCommit) {
-                            if (databaseNamePattern == null || 
databaseNamePattern.matcher(currentDatabase).matches()) {
-                                CommitTransactionEventInfo 
commitTransactionEvent = useGtid
-                                        ? new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
-                                        : new 
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, 
currentBinlogPosition);
-                                currentEventInfo = commitTransactionEvent;
-                                currentEventWriter = commitEventWriter;
-                                
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, 
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, 
eventWriterConfiguration));
-                            }
-                        } else {
-                            // If the COMMIT event is not to be written, the 
FlowFile should still be finished and the session committed.
-                            if (currentSession != null) {
-                                FlowFile flowFile = 
eventWriterConfiguration.getCurrentFlowFile();
-                                if (flowFile != null && currentEventWriter != 
null) {
-                                    // Flush the events to the FlowFile when 
the processor is stopped
-                                    
currentEventWriter.finishAndTransferFlowFile(currentSession, 
eventWriterConfiguration, transitUri, currentSequenceId.get(), 
currentEventInfo, REL_SUCCESS);
-                                }
-                                currentSession.commitAsync();
-                            }
+                        if (!(databaseNamePattern != null && 
!databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()))
 {
+                            commitEventHandler.handleEvent(queryEventData, 
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+                                    binlogEventState, sql, commitEventWriter, 
eventWriterConfiguration, currentSession, timestamp);
                         }
-
-                        //update inTransaction value to state
-                        inTransaction = false;
-                        updateState(session);
+                        // Whether we skip this event or not, it's the end of 
a transaction
+                        binlogResourceInfo.setInTransaction(false);
+                        updateState(session, dataCaptureState);
                         // If there is no FlowFile open, commit the session
                         if (eventWriterConfiguration.getCurrentFlowFile() == 
null) {
                             // Commit the NiFi session
                             session.commitAsync();
                         }
-                        currentTable = null;
+                        binlogResourceInfo.setCurrentTable(null);
+                        binlogResourceInfo.setCurrentDatabase(null);
                     } else {
                         // Check for DDL events (alter table, e.g.). Normalize 
the query to do string matching on the type of change
                         String normalizedQuery = normalizeQuery(sql);
 
-                        if (normalizedQuery.startsWith("alter table")
-                                || normalizedQuery.startsWith("alter ignore 
table")
-                                || normalizedQuery.startsWith("create table")
-                                || normalizedQuery.startsWith("truncate table")
-                                || normalizedQuery.startsWith("rename table")
-                                || normalizedQuery.startsWith("drop table")
-                                || normalizedQuery.startsWith("drop 
database")) {
-
-                            if (includeDDLEvents && (databaseNamePattern == 
null || databaseNamePattern.matcher(currentDatabase).matches())) {
-                                // If we don't have table information, we can 
still use the database name
-                                TableInfo ddlTableInfo = (currentTable != 
null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
-                                DDLEventInfo ddlEvent = useGtid
-                                        ? new DDLEventInfo(ddlTableInfo, 
timestamp, currentGtidSet, sql)
-                                        : new DDLEventInfo(ddlTableInfo, 
timestamp, currentBinlogFile, currentBinlogPosition, sql);
-                                currentEventInfo = ddlEvent;
-                                currentEventWriter = ddlEventWriter;
-                                
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, 
ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
-                            }
-                            // Remove all the keys from the cache that this 
processor added
-                            if (cacheClient != null) {
-                                
cacheClient.removeByPattern(this.getIdentifier() + ".*");
+                        if (isQueryDDL(normalizedQuery)) {
+                            if (!(databaseNamePattern != null && 
!databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()))
 {

Review Comment:
   This can be simplified.
   ```suggestion
                               if (databaseNamePattern == null || 
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()))
 {
   ```



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -839,19 +802,18 @@ protected void connect(List<InetSocketAddress> hosts, 
String username, String pa
         InetSocketAddress connectedHost = null;
         Exception lastConnectException = new Exception("Unknown connection 
error");
 
-        if (createEnrichmentConnection) {
-            try {
-                // Ensure driverLocation and driverName are correct before 
establishing binlog connection
-                // to avoid failing after binlog messages are received.
-                // Actual JDBC connection is created after binlog client gets 
started, because we need
-                // the connect-able host same as the binlog client.
-                registerDriver(driverLocation, driverName);
-            } catch (InitializationException e) {
-                throw new RuntimeException("Failed to register JDBC driver. 
Ensure MySQL Driver Location(s)" +
-                        " and MySQL Driver Class Name are configured 
correctly. " + e, e);
-            }
+        try {

Review Comment:
   Can you explain why `createEnrichmentConnection` check is no longer needed? 
I don't follow.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -1391,8 +1296,12 @@ private class JDBCConnectionHolder {
         private JDBCConnectionHolder(InetSocketAddress host, String username, 
String password, Map<String, String> customProperties, long 
connectionTimeoutMillis) {
             this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" 
+ host.getPort();
             connectionProps.putAll(customProperties);
-            connectionProps.put("user", username);
-            connectionProps.put("password", password);
+            if (username != null) {
+                connectionProps.put("user", username);
+            }
+            if (password != null) {

Review Comment:
   Does it make sense to set password when username is null?



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static 
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class InsertEventHandler implements 
BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> {
+    @Override
+    public void handleEvent(final WriteRowsEventData eventData, final boolean 
writeEvent, DataCaptureState dataCaptureState,
+                            CaptureChangeMySQL.BinlogResourceInfo 
binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState,
+                            final String sql, 
AbstractBinlogEventWriter<InsertRowsEventInfo> eventWriter,
+                            EventWriterConfiguration eventWriterConfiguration, 
ProcessSession session, final long timestamp) {
+        InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+                ? new 
InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getGtidSet(), eventData)
+                : new 
InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, 
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), 
eventData);
+        if (writeEvent) {

Review Comment:
   There are multiple cases when the handler does nothing when it is not a 
write event. If the handler would be able to decide alone whether it is a write 
event or not, it would be ok. But in this case, I think we should even call the 
event handler. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to