markap14 commented on a change in pull request #4901:
URL: https://github.com/apache/nifi/pull/4901#discussion_r811980517
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java
##########
@@ -232,64 +238,77 @@ public void onEnabled(final ConfigurationContext context)
throws InitializationE
public WriteResult sendData(final RecordSet recordSet, final Map<String,
String> attributes, final boolean sendZeroResults) throws IOException {
try {
- WriteResult writeResult;
final RecordSchema writeSchema =
getWriterFactory().getSchema(null, recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteCountingOutputStream out = new
ByteCountingOutputStream(baos);
int recordCount = 0;
try (final RecordSetWriter writer =
getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
- writer.beginRecordSet();
Record record;
while ((record = recordSet.next()) != null) {
Review comment:
@mattyb149 sorry I think I read the code wrong :) Here we are using
`baos.reset()` to clear the buffer in between in each record. Ignore this
comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]