tpalfy commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1100221116
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -185,6 +194,12 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
SSLMode.VERIFY_IDENTITY.toString(),
"Connect with TLS or fail when server support not enabled. Verify
server hostname matches presented X.509 certificate names or fail when not
matched");
+ private static final AllowableValue N_EVENTS_PER_FLOWFILE_STRATEGY = new
AllowableValue(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.name(), "N
Events Per FlowFile",
+ "This strategy causes the number of events specified in the Events
per FlowFile each binlog event to be written to its own FlowFile");
Review Comment:
```suggestion
"This strategy causes the number of events specified in the
'Number of Events Per FlowFile' property to be written per FlowFile");
```
Btw, don't we want to let the processor to write _fewer_ events than
specified (if only that much is available during a single `onTrigger` run)?
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -44,7 +49,7 @@ protected void writeJson(T event) throws IOException {
}
protected Map<String, String> getCommonAttributes(final long sequenceId,
BinlogEventInfo eventInfo) {
- return new HashMap<String, String>() {
+ return new HashMap<>() {
Review Comment:
This change doesn't work on Java 8.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -1017,15 +1096,19 @@ public void outputEvents(ProcessSession session,
ComponentLog log) throws IOExce
CommitTransactionEventInfo commitTransactionEvent =
useGtid
? new
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile,
currentBinlogPosition);
-
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri,
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+ currentEventInfo = commitTransactionEvent;
+ currentEventWriter = commitEventWriter;
+
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri,
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS,
eventWriterConfiguration));
Review Comment:
We are in a block that only runs when the emission of BEGIN and COMMIT is
requested by the user (on the processor).
That means that this does NOT run when it is not requested, and in that case
the `commitEventWriter.writeEvent` wont run and thus the flowfile will not be
sent sout.
In short, it doesn't work when the user wants ONE_TRANSACTION_PER_FLOWFILE
but doesn't want BEGIN and COMMIT events.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -967,13 +1026,19 @@ public void outputEvents(ProcessSession session,
ComponentLog log) throws IOExce
CommitTransactionEventInfo commitTransactionEvent
= useGtid
? new
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile,
currentBinlogPosition);
-
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri,
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
+ currentEventInfo = commitTransactionEvent;
+ currentEventWriter = commitEventWriter;
+
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri,
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS,
eventWriterConfiguration));
}
+
//update inTransaction value to state
inTransaction = false;
updateState(session);
- // Commit the NiFi session
- session.commitAsync();
+ // If there is no FlowFile open, commit the session
+ if (eventWriterConfiguration.getCurrentFlowFile() ==
null) {
+ // Commit the NiFi session
+ session.commitAsync();
Review Comment:
The process session is committed only after a transaction which is somewhat
desirable. But because of this the N events per flowfile can hold back a
flowfile with N events already in it until a commit comes. But the commit may
never actually come.
Consider the following case:
- N=5
- 1st transaction inserts 4 files
- 2nd transaction inserts 2 files.
- When the event for the first insert of the 2nd transaction is received we
finish up the current flowfile (has 4 events from the 1st transaction and 1
event from the 2nd transaction). However we don't commit the process session -
we are not handling a commit event! - so the flowfile is held back.
- When the commit for the 2nd transaction is received we have only 1 event
in the flowfile and it is still worked on (not null). So we don't commit the
process session.
- A bunch of flowfiles can queue up this way until we reach a point where
the commit comes when the flowfile has exactly N events or when we stop the
processor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]