[
https://issues.apache.org/jira/browse/NIFI-4008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069746#comment-16069746
]
ASF GitHub Bot commented on NIFI-4008:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1891#discussion_r124994470
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
---
@@ -418,82 +419,106 @@ private void writeDemarcatedData(final
ProcessSession session, final List<Consum
}
- private void writeRecordData(final ProcessSession session, final
List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition
topicPartition) {
+ private void writeRecordData(final ProcessSession session, final
List<ConsumerRecord<byte[], byte[]>> messages, final TopicPartition
topicPartition) {
// In order to obtain a RecordReader from the RecordReaderFactory,
we need to give it a FlowFile.
// We don't want to create a new FlowFile for each record that we
receive, so we will just create
// a "temporary flowfile" that will be removed in the finally
block below and use that to pass to
// the createRecordReader method.
final FlowFile tempFlowFile = session.create();
RecordSetWriter writer = null;
+ final BiConsumer<ConsumerRecord<byte[], byte[]>, Exception>
handleParseFailure = (consumerRecord, e) -> {
--- End diff --
I reviewed and merged #1906 . I will update this PR again.
> ConsumeKafkaRecord_0_10 assumes there is always one Record in a message
> -----------------------------------------------------------------------
>
> Key: NIFI-4008
> URL: https://issues.apache.org/jira/browse/NIFI-4008
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 1.2.0
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
>
> ConsumeKafkaRecord_0_10 uses ConsumerLease underneath, and it [assumes there
> is one Record available in a consumed
> message|https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java#L434]
> retrieved from a Kafka topic.
> But in fact, a message can contain 0 or more records in it. For example, with
> a record schema shown below:
> {code}
> {
> "type": "record",
> "name": "temp",
> "fields" : [
> {"name": "value", "type": "string"}
> ]
> }
> {code}
> Multiple records can be sent within a single message, e.g. using JSON:
> {code}
> [{"value": "a"}, {"value": "b"}, {"value": "c"}]
> {code}
> But ConsumeKafkaRecord only outputs the first record:
> {code}
> [{"value": "a"}]
> {code}
> Also, if a message doesn't contain any record in it, the processor fails with
> NullPointerException.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)