nandorsoma commented on code in PR #7116: URL: https://github.com/apache/nifi/pull/7116#discussion_r1176308032
########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import org.apache.nifi.cdc.event.TableInfo; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DDLEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class DDLEventHandler implements BinlogEventHandler<QueryEventData, DDLEventInfo> { + @Override + public void handleEvent(final QueryEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, AbstractBinlogEventWriter<DDLEventInfo> eventWriter, + EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) { + TableInfo ddlTableInfo = (binlogResourceInfo.getCurrentTable() != null) Review Comment: Parenthesis is not needed and it can be final. Also in other cases variables could be final. Could you fix them? ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import org.apache.nifi.cdc.event.TableInfo; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DDLEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class DDLEventHandler implements BinlogEventHandler<QueryEventData, DDLEventInfo> { + @Override + public void handleEvent(final QueryEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, AbstractBinlogEventWriter<DDLEventInfo> eventWriter, + EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) { + TableInfo ddlTableInfo = (binlogResourceInfo.getCurrentTable() != null) + ? binlogResourceInfo.getCurrentTable() + : new TableInfo(binlogResourceInfo.getCurrentDatabase(), null, null, null); + DDLEventInfo ddlEvent = dataCaptureState.isUseGtid() Review Comment: I think ddlTableInfo and ddlEvent creation could go into the `if (writeEvent)` block to eliminate unnecessary memory allocation. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class DeleteEventHandler implements BinlogEventHandler<DeleteRowsEventData, DeleteRowsEventInfo> { + @Override + public void handleEvent(final DeleteRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, AbstractBinlogEventWriter<DeleteRowsEventInfo> eventWriter, + EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) { + DeleteRowsEventInfo eventInfo = dataCaptureState.isUseGtid() Review Comment: Same as at DDLEventHandler. We could move those lines into the `if (writeEvent)` block. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class UpdateEventHandler implements BinlogEventHandler<UpdateRowsEventData, UpdateRowsEventInfo> { + @Override + public void handleEvent(final UpdateRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, AbstractBinlogEventWriter<UpdateRowsEventInfo> eventWriter, + EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) { + UpdateRowsEventInfo eventInfo = dataCaptureState.isUseGtid() Review Comment: Same like in the other cases. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java: ########## @@ -19,12 +19,35 @@ import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo; import java.io.IOException; +import java.io.Serializable; /** * An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g. */ public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> { + protected Object getWritableObject(Integer type, Serializable value) { + if (value == null) { + return null; + } + if (type == null) { + if (value instanceof byte[]) { + return new String((byte[]) value); + } else if (value instanceof Number) { + return value; + } + } else if (value instanceof Number) { + return value; + } else { + if (value instanceof byte[]) { + return new String((byte[]) value); + } else { + return value.toString(); + } + } + return null; + } Review Comment: I think it is a bit easier to read this way: ```suggestion protected Object getWritableObject(Integer type, Serializable value) { if (value == null) { return null; } if (type == null) { if (value instanceof byte[]) { return new String((byte[]) value); } else if (value instanceof Number) { return value; } else { return null; } } else { if (value instanceof byte[]) { return new String((byte[]) value); } else if (value instanceof Number) { return value; } else { return value.toString(); } } } ``` ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class InsertEventHandler implements BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> { + @Override + public void handleEvent(final WriteRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, AbstractBinlogEventWriter<InsertRowsEventInfo> eventWriter, + EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) { + InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid() Review Comment: Same like in the other cases. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.EventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.BinlogEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +public interface BinlogEventHandler<T extends EventData, S extends BinlogEventInfo> { + + void handleEvent(T eventData, + final boolean writeEvent, Review Comment: I'm not really in favor of using finals, but I think when we use them, we should use them consistently. It seems like other attributes can be final in this signature. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -228,13 +225,14 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() .name("capture-change-mysql-hosts") - .displayName("MySQL Hosts") - .description("A list of hostname/port entries corresponding to nodes in a MySQL cluster. The entries should be comma separated " - + "using a colon such as host1:port,host2:port,.... For example mysql.myhost.com:3306. This processor will attempt to connect to " + .displayName("MySQL Nodes") Review Comment: In a separate PR we could rename this property on the main branch. What do you think? ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.EventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class CommitEventHandler implements BinlogEventHandler<EventData, CommitTransactionEventInfo> { + @Override + public void handleEvent(final EventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, AbstractBinlogEventWriter<CommitTransactionEventInfo> eventWriter, + EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) { + final String currentDatabase = binlogResourceInfo.getCurrentDatabase(); Review Comment: As in the other cases it is enough to create currentDatabase and commitEvent in the `if (writeEvent)` block. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -317,10 +315,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor DIST_CACHE_CLIENT = new PropertyDescriptor.Builder() .name("capture-change-mysql-dist-map-cache-client") - .displayName("Distributed Map Cache Client") - .description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various table columns, datatypes, etc. " - + "needed by the processor. If a client is not specified, the generated events will not include column type or name information (but they will include database " - + "and table information.") + .displayName("Distributed Map Cache Client - unused") Review Comment: In a separate PR we could remove this property on the main branch. I also see that there is another deprecated property "State Update Interval". Maybe we could remove that also. What do you think? ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private static final List<PropertyDescriptor> propDescriptors; - private volatile ProcessSession currentSession; - private BinaryLogClient binlogClient; - private BinlogEventListener eventListener; - private BinlogLifecycleListener lifecycleListener; - private GtidSet gtidSet; + private volatile BinaryLogClient binlogClient; + private volatile BinlogEventListener eventListener; + private volatile BinlogLifecycleListener lifecycleListener; + private volatile GtidSet gtidSet; // Set queue capacity to avoid excessive memory consumption private final BlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(1000); - private volatile String currentBinlogFile = null; - private volatile long currentBinlogPosition = 4; - private volatile String currentGtidSet = null; - - // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback - private volatile String xactBinlogFile = null; - private volatile long xactBinlogPosition = 4; - private volatile long xactSequenceId = 0; - private volatile String xactGtidSet = null; - - private volatile TableInfo currentTable = null; - private volatile String currentDatabase = null; - private volatile Pattern databaseNamePattern; - private volatile Pattern tableNamePattern; - private volatile boolean includeBeginCommit = false; - private volatile boolean includeDDLEvents = false; - private volatile boolean useGtid = false; - private volatile boolean inTransaction = false; - private volatile boolean skipTable = false; - private final AtomicBoolean hasRun = new AtomicBoolean(false); + private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new HashMap<>(); - private int currentHost = 0; - private String transitUri = "<unknown>"; + private volatile ProcessSession currentSession; + private DataCaptureState currentDataCaptureState = new DataCaptureState(); - private final AtomicLong currentSequenceId = new AtomicLong(0); + private volatile BinlogResourceInfo binlogResourceInfo = new BinlogResourceInfo(); Review Comment: It's my personal taste, so feel free to ignore it, but I think it is better to initialize the objects when you actually fill then up with values. In this case in the setup method. Also when you stop the processor, you can clean it up by assigning null to it. This way it will use less memory when the processor is stopped. What do you think? ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.EventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.BinlogEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +public interface BinlogEventHandler<T extends EventData, S extends BinlogEventInfo> { + + void handleEvent(T eventData, + final boolean writeEvent, + DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, + CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, + AbstractBinlogEventWriter<S> eventWriter, Review Comment: It seems like there is a 1:1 connection between EventHandlers and EventWriters. So InsertEventHandler will always use InsertEventWriter. Is there a specific reason then that the eventWriter is coming from "outside"? Am I missing something? ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -974,139 +938,92 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce if (!skipTable) { TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId()); - if (cacheClient != null) { + binlogResourceInfo.setCurrentTable(tableInfoCache.get(key)); + if (binlogResourceInfo.getCurrentTable() == null) { + // We don't have an entry for this table yet, so fetch the info from the database and populate the cache try { - currentTable = cacheClient.get(key, cacheKeySerializer, cacheValueDeserializer); - } catch (ConnectException ce) { - throw new IOException("Could not connect to Distributed Map Cache server to get table information", ce); - } - - if (currentTable == null) { - // We don't have an entry for this table yet, so fetch the info from the database and populate the cache - try { - currentTable = loadTableInfo(key); - try { - cacheClient.put(key, currentTable, cacheKeySerializer, cacheValueSerializer); - } catch (ConnectException ce) { - throw new IOException("Could not connect to Distributed Map Cache server to put table information", ce); - } - } catch (SQLException se) { - // Propagate the error up, so things like rollback and logging/bulletins can be handled - throw new IOException(se.getMessage(), se); - } + binlogResourceInfo.setCurrentTable(loadTableInfo(key)); + tableInfoCache.put(key, binlogResourceInfo.getCurrentTable()); + } catch (SQLException se) { + // Propagate the error up, so things like rollback and logging/bulletins can be handled + throw new IOException(se.getMessage(), se); } - } else { - // Populate a limited version of TableInfo without column information - currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList()); } } else { - // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about - currentTable = null; + // Clear the current table, to force reload next time we get a TABLE_MAP event we care about + binlogResourceInfo.setCurrentTable(null); } break; case QUERY: QueryEventData queryEventData = event.getData(); - currentDatabase = queryEventData.getDatabase(); + binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase()); String sql = queryEventData.getSql(); // Is this the start of a transaction? if ("BEGIN".equals(sql)) { // If we're already in a transaction, something bad happened, alert the user - if (inTransaction) { + if (binlogResourceInfo.isInTransaction()) { getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid " - + "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile); + + "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile()); } - // Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.) - xactBinlogFile = currentBinlogFile; - xactBinlogPosition = currentBinlogPosition; - xactSequenceId = currentSequenceId.get(); - xactGtidSet = currentGtidSet; - - if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { - BeginTransactionEventInfo beginEvent = useGtid - ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentEventInfo = beginEvent; - currentEventWriter = beginEventWriter; - currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) { Review Comment: I think this can be simplified. ```suggestion if ((databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) { ``` ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -464,58 +460,44 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private static final List<PropertyDescriptor> propDescriptors; - private volatile ProcessSession currentSession; - private BinaryLogClient binlogClient; - private BinlogEventListener eventListener; - private BinlogLifecycleListener lifecycleListener; - private GtidSet gtidSet; + private volatile BinaryLogClient binlogClient; + private volatile BinlogEventListener eventListener; + private volatile BinlogLifecycleListener lifecycleListener; + private volatile GtidSet gtidSet; // Set queue capacity to avoid excessive memory consumption private final BlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(1000); - private volatile String currentBinlogFile = null; - private volatile long currentBinlogPosition = 4; - private volatile String currentGtidSet = null; - - // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback - private volatile String xactBinlogFile = null; - private volatile long xactBinlogPosition = 4; - private volatile long xactSequenceId = 0; - private volatile String xactGtidSet = null; - - private volatile TableInfo currentTable = null; - private volatile String currentDatabase = null; - private volatile Pattern databaseNamePattern; - private volatile Pattern tableNamePattern; - private volatile boolean includeBeginCommit = false; - private volatile boolean includeDDLEvents = false; - private volatile boolean useGtid = false; - private volatile boolean inTransaction = false; - private volatile boolean skipTable = false; - private final AtomicBoolean hasRun = new AtomicBoolean(false); + private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new HashMap<>(); - private int currentHost = 0; - private String transitUri = "<unknown>"; + private volatile ProcessSession currentSession; + private DataCaptureState currentDataCaptureState = new DataCaptureState(); - private final AtomicLong currentSequenceId = new AtomicLong(0); + private volatile BinlogResourceInfo binlogResourceInfo = new BinlogResourceInfo(); - private volatile DistributedMapCacheClient cacheClient = null; - private final Serializer<TableInfoCacheKey> cacheKeySerializer = new TableInfoCacheKey.Serializer(); - private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer(); - private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer(); + private volatile Pattern databaseNamePattern; + private volatile Pattern tableNamePattern; + private volatile boolean skipTable = false; + private int currentHost = 0; + private volatile JDBCConnectionHolder jdbcConnectionHolder = null; - private JDBCConnectionHolder jdbcConnectionHolder = null; + private final BinlogEventState binlogEventState = new BinlogEventState(); private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter(); + private final BeginEventHandler beginEventHandler = new BeginEventHandler(); private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter(); + private final CommitEventHandler commitEventHandler = new CommitEventHandler(); private final DDLEventWriter ddlEventWriter = new DDLEventWriter(); + private final DDLEventHandler ddlEventHandler = new DDLEventHandler(); private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter(); + private final InsertEventHandler insertEventHandler = new InsertEventHandler(); Review Comment: I'm wondering, is there an option to extract these handlers to a different class, that is able to retreive the required handler? This could greatly simplify the code, but I'm not sure that we can shrink the required parameters to a simple enough interface. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -974,139 +938,92 @@ public void outputEvents(ProcessSession session, ComponentLog log) throws IOExce if (!skipTable) { TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId()); - if (cacheClient != null) { + binlogResourceInfo.setCurrentTable(tableInfoCache.get(key)); + if (binlogResourceInfo.getCurrentTable() == null) { + // We don't have an entry for this table yet, so fetch the info from the database and populate the cache try { - currentTable = cacheClient.get(key, cacheKeySerializer, cacheValueDeserializer); - } catch (ConnectException ce) { - throw new IOException("Could not connect to Distributed Map Cache server to get table information", ce); - } - - if (currentTable == null) { - // We don't have an entry for this table yet, so fetch the info from the database and populate the cache - try { - currentTable = loadTableInfo(key); - try { - cacheClient.put(key, currentTable, cacheKeySerializer, cacheValueSerializer); - } catch (ConnectException ce) { - throw new IOException("Could not connect to Distributed Map Cache server to put table information", ce); - } - } catch (SQLException se) { - // Propagate the error up, so things like rollback and logging/bulletins can be handled - throw new IOException(se.getMessage(), se); - } + binlogResourceInfo.setCurrentTable(loadTableInfo(key)); + tableInfoCache.put(key, binlogResourceInfo.getCurrentTable()); + } catch (SQLException se) { + // Propagate the error up, so things like rollback and logging/bulletins can be handled + throw new IOException(se.getMessage(), se); } - } else { - // Populate a limited version of TableInfo without column information - currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList()); } } else { - // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about - currentTable = null; + // Clear the current table, to force reload next time we get a TABLE_MAP event we care about + binlogResourceInfo.setCurrentTable(null); } break; case QUERY: QueryEventData queryEventData = event.getData(); - currentDatabase = queryEventData.getDatabase(); + binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase()); String sql = queryEventData.getSql(); // Is this the start of a transaction? if ("BEGIN".equals(sql)) { // If we're already in a transaction, something bad happened, alert the user - if (inTransaction) { + if (binlogResourceInfo.isInTransaction()) { getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid " - + "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile); + + "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile()); } - // Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.) - xactBinlogFile = currentBinlogFile; - xactBinlogPosition = currentBinlogPosition; - xactSequenceId = currentSequenceId.get(); - xactGtidSet = currentGtidSet; - - if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { - BeginTransactionEventInfo beginEvent = useGtid - ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentEventInfo = beginEvent; - currentEventWriter = beginEventWriter; - currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) { + beginEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo, + binlogEventState, sql, beginEventWriter, eventWriterConfiguration, currentSession, timestamp); } - inTransaction = true; - //update inTransaction value to state - updateState(session); + // Whether we skip this event or not, it's still the beginning of a transaction + binlogResourceInfo.setInTransaction(true); + + // Update inTransaction value to state + updateState(session, dataCaptureState); } else if ("COMMIT".equals(sql)) { - if (!inTransaction) { + // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here + if (!binlogResourceInfo.isInTransaction()) { getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). " + "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state " - + "or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile); + + "or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile()); } - // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here - if (includeBeginCommit) { - if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) { - CommitTransactionEventInfo commitTransactionEvent = useGtid - ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentEventInfo = commitTransactionEvent; - currentEventWriter = commitEventWriter; - currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); - } - } else { - // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed. - if (currentSession != null) { - FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); - if (flowFile != null && currentEventWriter != null) { - // Flush the events to the FlowFile when the processor is stopped - currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); - } - currentSession.commitAsync(); - } + if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) { + commitEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo, + binlogEventState, sql, commitEventWriter, eventWriterConfiguration, currentSession, timestamp); } - - //update inTransaction value to state - inTransaction = false; - updateState(session); + // Whether we skip this event or not, it's the end of a transaction + binlogResourceInfo.setInTransaction(false); + updateState(session, dataCaptureState); // If there is no FlowFile open, commit the session if (eventWriterConfiguration.getCurrentFlowFile() == null) { // Commit the NiFi session session.commitAsync(); } - currentTable = null; + binlogResourceInfo.setCurrentTable(null); + binlogResourceInfo.setCurrentDatabase(null); } else { // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change String normalizedQuery = normalizeQuery(sql); - if (normalizedQuery.startsWith("alter table") - || normalizedQuery.startsWith("alter ignore table") - || normalizedQuery.startsWith("create table") - || normalizedQuery.startsWith("truncate table") - || normalizedQuery.startsWith("rename table") - || normalizedQuery.startsWith("drop table") - || normalizedQuery.startsWith("drop database")) { - - if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { - // If we don't have table information, we can still use the database name - TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null); - DDLEventInfo ddlEvent = useGtid - ? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql) - : new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql); - currentEventInfo = ddlEvent; - currentEventWriter = ddlEventWriter; - currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); - } - // Remove all the keys from the cache that this processor added - if (cacheClient != null) { - cacheClient.removeByPattern(this.getIdentifier() + ".*"); + if (isQueryDDL(normalizedQuery)) { + if (!(databaseNamePattern != null && !databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) { Review Comment: This can be simplified. ```suggestion if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())) { ``` ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -839,19 +802,18 @@ protected void connect(List<InetSocketAddress> hosts, String username, String pa InetSocketAddress connectedHost = null; Exception lastConnectException = new Exception("Unknown connection error"); - if (createEnrichmentConnection) { - try { - // Ensure driverLocation and driverName are correct before establishing binlog connection - // to avoid failing after binlog messages are received. - // Actual JDBC connection is created after binlog client gets started, because we need - // the connect-able host same as the binlog client. - registerDriver(driverLocation, driverName); - } catch (InitializationException e) { - throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" + - " and MySQL Driver Class Name are configured correctly. " + e, e); - } + try { Review Comment: Can you explain why `createEnrichmentConnection` check is no longer needed? I don't follow. ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java: ########## @@ -1391,8 +1296,12 @@ private class JDBCConnectionHolder { private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map<String, String> customProperties, long connectionTimeoutMillis) { this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort(); connectionProps.putAll(customProperties); - connectionProps.put("user", username); - connectionProps.put("password", password); + if (username != null) { + connectionProps.put("user", username); + } + if (password != null) { Review Comment: Does it make sense to set password when username is null? ########## nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class InsertEventHandler implements BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> { + @Override + public void handleEvent(final WriteRowsEventData eventData, final boolean writeEvent, DataCaptureState dataCaptureState, + CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, AbstractBinlogEventWriter<InsertRowsEventInfo> eventWriter, + EventWriterConfiguration eventWriterConfiguration, ProcessSession session, final long timestamp) { + InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid() + ? new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData) + : new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData); + if (writeEvent) { Review Comment: There are multiple cases when the handler does nothing when it is not a write event. If the handler would be able to decide alone whether it is a write event or not, it would be ok. But in this case, I think we should even call the event handler. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
