mattyb149 commented on a change in pull request #4901:
URL: https://github.com/apache/nifi/pull/4901#discussion_r811934413
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java
##########
@@ -232,64 +238,77 @@ public void onEnabled(final ConfigurationContext context)
throws InitializationE
public WriteResult sendData(final RecordSet recordSet, final Map<String,
String> attributes, final boolean sendZeroResults) throws IOException {
try {
- WriteResult writeResult;
final RecordSchema writeSchema =
getWriterFactory().getSchema(null, recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteCountingOutputStream out = new
ByteCountingOutputStream(baos);
int recordCount = 0;
try (final RecordSetWriter writer =
getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
- writer.beginRecordSet();
Record record;
while ((record = recordSet.next()) != null) {
Review comment:
It's been a while since I looked at this but the intent was to send
individual messages rather than all records in one message. Can this still
cause heap exhaustion if we don't ack them until they're all sent? What would
the property be, maybe Messages per Batch?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]