mark-bathori commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1101506026
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final
long sequenceId, BinlogE
// Default implementation for binlog events
@Override
- public long writeEvent(ProcessSession session, String transitUri, T
eventInfo, long currentSequenceId, Relationship relationship) {
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, (outputStream) -> {
+ public long writeEvent(ProcessSession session, String transitUri, T
eventInfo, long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration
eventWriterConfiguration) {
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile == null) {
+ flowFile = session.create();
+ OutputStream flowFileOutputStream = session.write(flowFile);
+
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+ eventWriterConfiguration.setCurrentFlowFile(flowFile);
+ if (eventWriterConfiguration.getJsonGenerator() == null) {
+ try {
+ jsonGenerator = createJsonGenerator(flowFileOutputStream);
+ eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't create JSON
generator", ioe);
+ }
+ }
+ if
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+ && eventWriterConfiguration.getNumberOfEventsPerFlowFile()
> 1)
+ ||
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
{
+ try {
+ jsonGenerator.writeStartArray();
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't write start of
event array", ioe);
+ }
+ }
+ }
+ jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+
+ OutputStream outputStream =
eventWriterConfiguration.getFlowFileOutputStream();
+ try {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
- });
- flowFile = session.putAllAttributes(flowFile,
getCommonAttributes(currentSequenceId, eventInfo));
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't write start of event
array", ioe);
+ }
+
+ eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+ // Check if it is time to finish the FlowFile
+ if
(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+ && eventWriterConfiguration.getNumberOfEventsWritten() ==
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+ flowFile = finishAndTransferFlowFile(eventWriterConfiguration,
transitUri, currentSequenceId, eventInfo, relationship);
+ }
+ eventWriterConfiguration.setCurrentFlowFile(flowFile);
+ return currentSequenceId + 1;
+ }
+
+ public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration
eventWriterConfiguration, final String transitUri, final long seqId,
+ final BinlogEventInfo eventInfo,
final Relationship relationship) {
+ // If writing multiple events, end the array
+ if (eventWriterConfiguration.getNumberOfEventsWritten() > 1
+ ||
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
{
+ try {
+ jsonGenerator.writeEndArray();
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't write end of event
array", ioe);
+ }
+ }
+ try {
+ endFile();
+ eventWriterConfiguration.setJsonGenerator(null);
+ eventWriterConfiguration.getFlowFileOutputStream().close();
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't flush and close file",
ioe);
+ }
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ ProcessSession session = eventWriterConfiguration.getWorkingSession();
+ if (session == null && flowFile == null) {
Review Comment:
I think the `&&` operator should be `||` here since if either condition is
false we will got NullPointerException in the next line. The exception's
description also assumes 'OR' relation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]