mattyb149 commented on code in PR #7116:
URL: https://github.com/apache/nifi/pull/7116#discussion_r1183708724
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
private static final List<PropertyDescriptor> propDescriptors;
- private volatile ProcessSession currentSession;
- private BinaryLogClient binlogClient;
- private BinlogEventListener eventListener;
- private BinlogLifecycleListener lifecycleListener;
- private GtidSet gtidSet;
+ private volatile BinaryLogClient binlogClient;
+ private volatile BinlogEventListener eventListener;
+ private volatile BinlogLifecycleListener lifecycleListener;
+ private volatile GtidSet gtidSet;
// Set queue capacity to avoid excessive memory consumption
private final BlockingQueue<RawBinlogEvent> queue = new
LinkedBlockingQueue<>(1000);
- private volatile String currentBinlogFile = null;
- private volatile long currentBinlogPosition = 4;
- private volatile String currentGtidSet = null;
-
- // The following variables save the value of the binlog filename,
position, (sequence id), and gtid at the beginning of a transaction. Used for
rollback
- private volatile String xactBinlogFile = null;
- private volatile long xactBinlogPosition = 4;
- private volatile long xactSequenceId = 0;
- private volatile String xactGtidSet = null;
-
- private volatile TableInfo currentTable = null;
- private volatile String currentDatabase = null;
- private volatile Pattern databaseNamePattern;
- private volatile Pattern tableNamePattern;
- private volatile boolean includeBeginCommit = false;
- private volatile boolean includeDDLEvents = false;
- private volatile boolean useGtid = false;
- private volatile boolean inTransaction = false;
- private volatile boolean skipTable = false;
- private final AtomicBoolean hasRun = new AtomicBoolean(false);
+ private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new
HashMap<>();
- private int currentHost = 0;
- private String transitUri = "<unknown>";
+ private volatile ProcessSession currentSession;
+ private DataCaptureState currentDataCaptureState = new DataCaptureState();
- private final AtomicLong currentSequenceId = new AtomicLong(0);
+ private volatile BinlogResourceInfo binlogResourceInfo = new
BinlogResourceInfo();
- private volatile DistributedMapCacheClient cacheClient = null;
- private final Serializer<TableInfoCacheKey> cacheKeySerializer = new
TableInfoCacheKey.Serializer();
- private final Serializer<TableInfo> cacheValueSerializer = new
TableInfo.Serializer();
- private final Deserializer<TableInfo> cacheValueDeserializer = new
TableInfo.Deserializer();
+ private volatile Pattern databaseNamePattern;
+ private volatile Pattern tableNamePattern;
+ private volatile boolean skipTable = false;
+ private int currentHost = 0;
+ private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
- private JDBCConnectionHolder jdbcConnectionHolder = null;
+ private final BinlogEventState binlogEventState = new BinlogEventState();
private final BeginTransactionEventWriter beginEventWriter = new
BeginTransactionEventWriter();
+ private final BeginEventHandler beginEventHandler = new
BeginEventHandler();
private final CommitTransactionEventWriter commitEventWriter = new
CommitTransactionEventWriter();
+ private final CommitEventHandler commitEventHandler = new
CommitEventHandler();
private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
+ private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+ private final InsertEventHandler insertEventHandler = new
InsertEventHandler();
Review Comment:
That was what I was hoping to do, but I think the required parameters made
the interface method pretty awkward. I can take another look
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]