mark-bathori commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1101506026


##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final 
long sequenceId, BinlogE
 
     // Default implementation for binlog events
     @Override
-    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship) {
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, (outputStream) -> {
+    public long writeEvent(ProcessSession session, String transitUri, T 
eventInfo, long currentSequenceId, Relationship relationship,
+                           final EventWriterConfiguration 
eventWriterConfiguration) {
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        if (flowFile == null) {
+            flowFile = session.create();
+            OutputStream flowFileOutputStream = session.write(flowFile);
+            
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+            eventWriterConfiguration.setCurrentFlowFile(flowFile);
+            if (eventWriterConfiguration.getJsonGenerator() == null) {
+                try {
+                    jsonGenerator = createJsonGenerator(flowFileOutputStream);
+                    eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't create JSON 
generator", ioe);
+                }
+            }
+            if 
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                    && eventWriterConfiguration.getNumberOfEventsPerFlowFile() 
> 1)
+                    || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+                try {
+                    jsonGenerator.writeStartArray();
+                } catch (IOException ioe) {
+                    throw new FlowFileAccessException("Couldn't write start of 
event array", ioe);
+                }
+            }
+        }
+        jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+
+        OutputStream outputStream = 
eventWriterConfiguration.getFlowFileOutputStream();
+        try {
             super.startJson(outputStream, eventInfo);
             writeJson(eventInfo);
             // Nothing in the body
             super.endJson();
-        });
-        flowFile = session.putAllAttributes(flowFile, 
getCommonAttributes(currentSequenceId, eventInfo));
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't write start of event 
array", ioe);
+        }
+
+        eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+        // Check if it is time to finish the FlowFile
+        if 
(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+                && eventWriterConfiguration.getNumberOfEventsWritten() == 
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+            flowFile = finishAndTransferFlowFile(eventWriterConfiguration, 
transitUri, currentSequenceId, eventInfo, relationship);
+        }
+        eventWriterConfiguration.setCurrentFlowFile(flowFile);
+        return currentSequenceId + 1;
+    }
+
+    public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration 
eventWriterConfiguration, final String transitUri, final long seqId,
+                                              final BinlogEventInfo eventInfo, 
final Relationship relationship) {
+        // If writing multiple events, end the array
+        if (eventWriterConfiguration.getNumberOfEventsWritten() > 1
+                || 
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
 {
+            try {
+                jsonGenerator.writeEndArray();
+            } catch (IOException ioe) {
+                throw new FlowFileAccessException("Couldn't write end of event 
array", ioe);
+            }
+        }
+        try {
+            endFile();
+            eventWriterConfiguration.setJsonGenerator(null);
+            eventWriterConfiguration.getFlowFileOutputStream().close();
+        } catch (IOException ioe) {
+            throw new FlowFileAccessException("Couldn't flush and close file", 
ioe);
+        }
+        FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+        ProcessSession session = eventWriterConfiguration.getWorkingSession();
+        if (session == null && flowFile == null) {

Review Comment:
   I think the `&&` operator should be `||` here since either condition is 
false we will got NullPointerException in the next line. The exception's 
description also assumes 'OR' relation.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+    private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+    private int numberOfEventsWritten = 0;
+    private int numberOfEventsPerFlowFile = 1000;
+    private FlowFile currentFlowFile;
+    private OutputStream flowFileOutputStream;
+    private ProcessSession workingSession;
+    private JsonGenerator jsonGenerator;
+
+    public EventWriterConfiguration(FlowFileEventWriteStrategy 
flowFileEventWriteStrategy, int numberOfEventsWritten, int 
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {
+        this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+        this.numberOfEventsWritten = numberOfEventsWritten;
+        this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+        this.currentFlowFile = currentFlowFile;
+    }
+
+    public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+        return flowFileEventWriteStrategy;
+    }
+
+    public void setFlowFileEventWriteStrategy(FlowFileEventWriteStrategy 
flowFileEventWriteStrategy) {
+        this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+    }
+
+    public int getNumberOfEventsWritten() {
+        return numberOfEventsWritten;
+    }
+
+    public void setNumberOfEventsWritten(int numberOfEventsWritten) {

Review Comment:
   This method is always called with the same value. Maybe remove the method 
param and set the `numberOfEventsWritten` to 0 within the method's body and 
rename the function according to it's modified functionality.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+    private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+    private int numberOfEventsWritten = 0;
+    private int numberOfEventsPerFlowFile = 1000;
+    private FlowFile currentFlowFile;
+    private OutputStream flowFileOutputStream;
+    private ProcessSession workingSession;
+    private JsonGenerator jsonGenerator;
+
+    public EventWriterConfiguration(FlowFileEventWriteStrategy 
flowFileEventWriteStrategy, int numberOfEventsWritten, int 
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {
+        this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+        this.numberOfEventsWritten = numberOfEventsWritten;
+        this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+        this.currentFlowFile = currentFlowFile;
+    }
+
+    public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+        return flowFileEventWriteStrategy;
+    }
+
+    public void setFlowFileEventWriteStrategy(FlowFileEventWriteStrategy 
flowFileEventWriteStrategy) {

Review Comment:
   This method is unused and since the write strategy is coming from a 
processor property I don't think it should be modifiable after setting it's 
initial value.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+    private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+    private int numberOfEventsWritten = 0;
+    private int numberOfEventsPerFlowFile = 1000;
+    private FlowFile currentFlowFile;
+    private OutputStream flowFileOutputStream;
+    private ProcessSession workingSession;
+    private JsonGenerator jsonGenerator;
+
+    public EventWriterConfiguration(FlowFileEventWriteStrategy 
flowFileEventWriteStrategy, int numberOfEventsWritten, int 
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {

Review Comment:
   Currently this constructor method is only called with 
`numberOfEventsWritten=0` and `currentFlowFile=null`, these parameters can be 
removed from the constructor and have their values initialized at class level.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -1241,6 +1335,23 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) 
throws SQLException {
         return tableInfo;
     }
 
+    protected Map<String, String> getCommonAttributes(final long sequenceId, 
BinlogEventInfo eventInfo) {

Review Comment:
   This method is unused.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+    private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+    private int numberOfEventsWritten = 0;
+    private int numberOfEventsPerFlowFile = 1000;
+    private FlowFile currentFlowFile;
+    private OutputStream flowFileOutputStream;
+    private ProcessSession workingSession;
+    private JsonGenerator jsonGenerator;
+
+    public EventWriterConfiguration(FlowFileEventWriteStrategy 
flowFileEventWriteStrategy, int numberOfEventsWritten, int 
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {
+        this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+        this.numberOfEventsWritten = numberOfEventsWritten;
+        this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+        this.currentFlowFile = currentFlowFile;
+    }
+
+    public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+        return flowFileEventWriteStrategy;
+    }
+
+    public void setFlowFileEventWriteStrategy(FlowFileEventWriteStrategy 
flowFileEventWriteStrategy) {
+        this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+    }
+
+    public int getNumberOfEventsWritten() {
+        return numberOfEventsWritten;
+    }
+
+    public void setNumberOfEventsWritten(int numberOfEventsWritten) {
+        this.numberOfEventsWritten = numberOfEventsWritten;
+    }
+
+    public void incrementNumberOfEventsWritten() {
+        this.numberOfEventsWritten++;
+    }
+
+    public int getNumberOfEventsPerFlowFile() {
+        return numberOfEventsPerFlowFile;
+    }
+
+    public void setNumberOfEventsPerFlowFile(int numberOfEventsPerFlowFile) {

Review Comment:
   Same as the write strategy field. This method is unused and since this is 
coming from a processor property I don't think it should be modifiable after 
setting it's initial value.



##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -44,7 +49,7 @@ protected void writeJson(T event) throws IOException {
     }
 
     protected Map<String, String> getCommonAttributes(final long sequenceId, 
BinlogEventInfo eventInfo) {
-        return new HashMap<String, String>() {

Review Comment:
   This is causing compilation problem with java8.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to