mattyb149 commented on code in PR #7116:
URL: https://github.com/apache/nifi/pull/7116#discussion_r1183708724


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     private static final List<PropertyDescriptor> propDescriptors;
 
-    private volatile ProcessSession currentSession;
-    private BinaryLogClient binlogClient;
-    private BinlogEventListener eventListener;
-    private BinlogLifecycleListener lifecycleListener;
-    private GtidSet gtidSet;
+    private volatile BinaryLogClient binlogClient;
+    private volatile BinlogEventListener eventListener;
+    private volatile BinlogLifecycleListener lifecycleListener;
+    private volatile GtidSet gtidSet;
 
     // Set queue capacity to avoid excessive memory consumption
     private final BlockingQueue<RawBinlogEvent> queue = new 
LinkedBlockingQueue<>(1000);
-    private volatile String currentBinlogFile = null;
-    private volatile long currentBinlogPosition = 4;
-    private volatile String currentGtidSet = null;
-
-    // The following variables save the value of the binlog filename, 
position, (sequence id), and gtid at the beginning of a transaction. Used for 
rollback
-    private volatile String xactBinlogFile = null;
-    private volatile long xactBinlogPosition = 4;
-    private volatile long xactSequenceId = 0;
-    private volatile String xactGtidSet = null;
-
-    private volatile TableInfo currentTable = null;
-    private volatile String currentDatabase = null;
-    private volatile Pattern databaseNamePattern;
-    private volatile Pattern tableNamePattern;
-    private volatile boolean includeBeginCommit = false;
-    private volatile boolean includeDDLEvents = false;
-    private volatile boolean useGtid = false;
 
-    private volatile boolean inTransaction = false;
-    private volatile boolean skipTable = false;
-    private final AtomicBoolean hasRun = new AtomicBoolean(false);
+    private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new 
HashMap<>();
 
-    private int currentHost = 0;
-    private String transitUri = "<unknown>";
+    private volatile ProcessSession currentSession;
+    private DataCaptureState currentDataCaptureState = new DataCaptureState();
 
-    private final AtomicLong currentSequenceId = new AtomicLong(0);
+    private volatile BinlogResourceInfo binlogResourceInfo = new 
BinlogResourceInfo();
 
-    private volatile DistributedMapCacheClient cacheClient = null;
-    private final Serializer<TableInfoCacheKey> cacheKeySerializer = new 
TableInfoCacheKey.Serializer();
-    private final Serializer<TableInfo> cacheValueSerializer = new 
TableInfo.Serializer();
-    private final Deserializer<TableInfo> cacheValueDeserializer = new 
TableInfo.Deserializer();
+    private volatile Pattern databaseNamePattern;
+    private volatile Pattern tableNamePattern;
+    private volatile boolean skipTable = false;
+    private int currentHost = 0;
+    private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
 
-    private JDBCConnectionHolder jdbcConnectionHolder = null;
+    private final BinlogEventState binlogEventState = new BinlogEventState();
 
     private final BeginTransactionEventWriter beginEventWriter = new 
BeginTransactionEventWriter();
+    private final BeginEventHandler beginEventHandler = new 
BeginEventHandler();
     private final CommitTransactionEventWriter commitEventWriter = new 
CommitTransactionEventWriter();
+    private final CommitEventHandler commitEventHandler = new 
CommitEventHandler();
     private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
+    private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
     private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+    private final InsertEventHandler insertEventHandler = new 
InsertEventHandler();

Review Comment:
   That was what I was hoping to do, but I think the required parameters made 
the interface method pretty awkward. I can take another look



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to