mark-bathori commented on code in PR #6907:
URL: https://github.com/apache/nifi/pull/6907#discussion_r1101506026
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -62,17 +67,112 @@ protected Map<String, String> getCommonAttributes(final
long sequenceId, BinlogE
// Default implementation for binlog events
@Override
- public long writeEvent(ProcessSession session, String transitUri, T
eventInfo, long currentSequenceId, Relationship relationship) {
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, (outputStream) -> {
+ public long writeEvent(ProcessSession session, String transitUri, T
eventInfo, long currentSequenceId, Relationship relationship,
+ final EventWriterConfiguration
eventWriterConfiguration) {
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile == null) {
+ flowFile = session.create();
+ OutputStream flowFileOutputStream = session.write(flowFile);
+
eventWriterConfiguration.setFlowFileOutputStream(flowFileOutputStream);
+ eventWriterConfiguration.setCurrentFlowFile(flowFile);
+ if (eventWriterConfiguration.getJsonGenerator() == null) {
+ try {
+ jsonGenerator = createJsonGenerator(flowFileOutputStream);
+ eventWriterConfiguration.setJsonGenerator(jsonGenerator);
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't create JSON
generator", ioe);
+ }
+ }
+ if
((FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+ && eventWriterConfiguration.getNumberOfEventsPerFlowFile()
> 1)
+ ||
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
{
+ try {
+ jsonGenerator.writeStartArray();
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't write start of
event array", ioe);
+ }
+ }
+ }
+ jsonGenerator = eventWriterConfiguration.getJsonGenerator();
+
+ OutputStream outputStream =
eventWriterConfiguration.getFlowFileOutputStream();
+ try {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
- });
- flowFile = session.putAllAttributes(flowFile,
getCommonAttributes(currentSequenceId, eventInfo));
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't write start of event
array", ioe);
+ }
+
+ eventWriterConfiguration.incrementNumberOfEventsWritten();
+
+ // Check if it is time to finish the FlowFile
+ if
(FlowFileEventWriteStrategy.N_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())
+ && eventWriterConfiguration.getNumberOfEventsWritten() ==
eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
+ flowFile = finishAndTransferFlowFile(eventWriterConfiguration,
transitUri, currentSequenceId, eventInfo, relationship);
+ }
+ eventWriterConfiguration.setCurrentFlowFile(flowFile);
+ return currentSequenceId + 1;
+ }
+
+ public FlowFile finishAndTransferFlowFile(final EventWriterConfiguration
eventWriterConfiguration, final String transitUri, final long seqId,
+ final BinlogEventInfo eventInfo,
final Relationship relationship) {
+ // If writing multiple events, end the array
+ if (eventWriterConfiguration.getNumberOfEventsWritten() > 1
+ ||
FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
{
+ try {
+ jsonGenerator.writeEndArray();
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't write end of event
array", ioe);
+ }
+ }
+ try {
+ endFile();
+ eventWriterConfiguration.setJsonGenerator(null);
+ eventWriterConfiguration.getFlowFileOutputStream().close();
+ } catch (IOException ioe) {
+ throw new FlowFileAccessException("Couldn't flush and close file",
ioe);
+ }
+ FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
+ ProcessSession session = eventWriterConfiguration.getWorkingSession();
+ if (session == null && flowFile == null) {
Review Comment:
I think the `&&` operator should be `||` here since either condition is
false we will got NullPointerException in the next line. The exception's
description also assumes 'OR' relation.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+ private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+ private int numberOfEventsWritten = 0;
+ private int numberOfEventsPerFlowFile = 1000;
+ private FlowFile currentFlowFile;
+ private OutputStream flowFileOutputStream;
+ private ProcessSession workingSession;
+ private JsonGenerator jsonGenerator;
+
+ public EventWriterConfiguration(FlowFileEventWriteStrategy
flowFileEventWriteStrategy, int numberOfEventsWritten, int
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {
+ this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+ this.numberOfEventsWritten = numberOfEventsWritten;
+ this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+ this.currentFlowFile = currentFlowFile;
+ }
+
+ public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+ return flowFileEventWriteStrategy;
+ }
+
+ public void setFlowFileEventWriteStrategy(FlowFileEventWriteStrategy
flowFileEventWriteStrategy) {
+ this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+ }
+
+ public int getNumberOfEventsWritten() {
+ return numberOfEventsWritten;
+ }
+
+ public void setNumberOfEventsWritten(int numberOfEventsWritten) {
Review Comment:
This method is always called with the same value. Maybe remove the method
param and set the `numberOfEventsWritten` to 0 within the method's body and
rename the function according to it's modified functionality.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+ private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+ private int numberOfEventsWritten = 0;
+ private int numberOfEventsPerFlowFile = 1000;
+ private FlowFile currentFlowFile;
+ private OutputStream flowFileOutputStream;
+ private ProcessSession workingSession;
+ private JsonGenerator jsonGenerator;
+
+ public EventWriterConfiguration(FlowFileEventWriteStrategy
flowFileEventWriteStrategy, int numberOfEventsWritten, int
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {
+ this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+ this.numberOfEventsWritten = numberOfEventsWritten;
+ this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+ this.currentFlowFile = currentFlowFile;
+ }
+
+ public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+ return flowFileEventWriteStrategy;
+ }
+
+ public void setFlowFileEventWriteStrategy(FlowFileEventWriteStrategy
flowFileEventWriteStrategy) {
Review Comment:
This method is unused and since the write strategy is coming from a
processor property I don't think it should be modifiable after setting it's
initial value.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+ private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+ private int numberOfEventsWritten = 0;
+ private int numberOfEventsPerFlowFile = 1000;
+ private FlowFile currentFlowFile;
+ private OutputStream flowFileOutputStream;
+ private ProcessSession workingSession;
+ private JsonGenerator jsonGenerator;
+
+ public EventWriterConfiguration(FlowFileEventWriteStrategy
flowFileEventWriteStrategy, int numberOfEventsWritten, int
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {
Review Comment:
Currently this constructor method is only called with
`numberOfEventsWritten=0` and `currentFlowFile=null`, these parameters can be
removed from the constructor and have their values initialized at class level.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java:
##########
@@ -1241,6 +1335,23 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key)
throws SQLException {
return tableInfo;
}
+ protected Map<String, String> getCommonAttributes(final long sequenceId,
BinlogEventInfo eventInfo) {
Review Comment:
This method is unused.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.event.io;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.io.OutputStream;
+
+public class EventWriterConfiguration {
+
+ private FlowFileEventWriteStrategy flowFileEventWriteStrategy;
+ private int numberOfEventsWritten = 0;
+ private int numberOfEventsPerFlowFile = 1000;
+ private FlowFile currentFlowFile;
+ private OutputStream flowFileOutputStream;
+ private ProcessSession workingSession;
+ private JsonGenerator jsonGenerator;
+
+ public EventWriterConfiguration(FlowFileEventWriteStrategy
flowFileEventWriteStrategy, int numberOfEventsWritten, int
numberOfEventsPerFlowFile, FlowFile currentFlowFile) {
+ this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+ this.numberOfEventsWritten = numberOfEventsWritten;
+ this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
+ this.currentFlowFile = currentFlowFile;
+ }
+
+ public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
+ return flowFileEventWriteStrategy;
+ }
+
+ public void setFlowFileEventWriteStrategy(FlowFileEventWriteStrategy
flowFileEventWriteStrategy) {
+ this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
+ }
+
+ public int getNumberOfEventsWritten() {
+ return numberOfEventsWritten;
+ }
+
+ public void setNumberOfEventsWritten(int numberOfEventsWritten) {
+ this.numberOfEventsWritten = numberOfEventsWritten;
+ }
+
+ public void incrementNumberOfEventsWritten() {
+ this.numberOfEventsWritten++;
+ }
+
+ public int getNumberOfEventsPerFlowFile() {
+ return numberOfEventsPerFlowFile;
+ }
+
+ public void setNumberOfEventsPerFlowFile(int numberOfEventsPerFlowFile) {
Review Comment:
Same as the write strategy field. This method is unused and since this is
coming from a processor property I don't think it should be modifiable after
setting it's initial value.
##########
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java:
##########
@@ -44,7 +49,7 @@ protected void writeJson(T event) throws IOException {
}
protected Map<String, String> getCommonAttributes(final long sequenceId,
BinlogEventInfo eventInfo) {
- return new HashMap<String, String>() {
Review Comment:
This is causing compilation problem with java8.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]