This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e6229ab NIFI-9771: When a Kafka record is obtained during config
verification, we should produce an invalid response if the Record Reader is not
able to produce any records from it
e6229ab is described below
commit e6229ab938571f5339805797b6a6ffeebcfad652
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 7 11:54:37 2022 -0500
NIFI-9771: When a Kafka record is obtained during config verification, we
should produce an invalid response if the Record Reader is not able to produce
any records from it
Signed-off-by: Joe Gresock <[email protected]>
This closes #5847.
---
.../nifi/processors/kafka/pubsub/ConsumerPool.java | 57 +++++++++++++---------
1 file changed, 33 insertions(+), 24 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 6c4cc49..e8e0311 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -24,9 +24,9 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.RecordReader;
@@ -403,52 +403,61 @@ public class ConsumerPool implements Closeable {
final Map<String, Integer> recordsPerTopic = new HashMap<>();
for (final ConsumerRecord<byte[], byte[]> consumerRecord :
consumerRecords) {
- recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
final Map<String, String> attributes =
consumerLease.getAttributes(consumerRecord);
+ int numRecords = 0;
final byte[] recordBytes = consumerRecord.value() == null ? new
byte[0] : consumerRecord.value();
try (final InputStream in = new ByteArrayInputStream(recordBytes))
{
final RecordReader reader =
readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
while (reader.nextRecord() != null) {
+ numRecords++;
}
} catch (final Exception e) {
parseFailuresPerTopic.merge(consumerRecord.topic(), 1,
Integer::sum);
latestParseFailureDescription.put(consumerRecord.topic(),
e.toString());
}
+
+ if (numRecords == 0) {
+ parseFailuresPerTopic.merge(consumerRecord.topic(), 1,
Integer::sum);
+ latestParseFailureDescription.put(consumerRecord.topic(),
"Received Kafka message but Record Reader produced no Record from it");
+ recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
+ } else {
+ recordsPerTopic.merge(consumerRecord.topic(), numRecords,
Integer::sum);
+ }
}
// Note here that we do not commit the offsets. We will just let the
consumer close without committing the offsets, which
// will roll back the consumption of the messages.
- if (recordsPerTopic.isEmpty()) {
- return new ConfigVerificationResult.Builder()
- .verificationStepName("Parse Records")
- .outcome(Outcome.SKIPPED)
- .explanation("Received no messages to attempt parsing within
the 30 second timeout")
- .build();
- }
-
if (parseFailuresPerTopic.isEmpty()) {
+ if (recordsPerTopic.isEmpty()) {
+ return new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Records")
+ .outcome(Outcome.SKIPPED)
+ .explanation("Received no messages to attempt parsing
within the 30 second timeout")
+ .build();
+ }
+
return new ConfigVerificationResult.Builder()
.verificationStepName("Parse Records")
.outcome(Outcome.SUCCESSFUL)
.explanation("Was able to parse all Records consumed from
topics. Number of Records consumed from each topic: " + recordsPerTopic)
.build();
- } else {
- final Map<String, String> failureDescriptions = new HashMap<>();
- for (final String topic : recordsPerTopic.keySet()) {
- final int records = recordsPerTopic.get(topic);
- final Integer failures = parseFailuresPerTopic.get(topic);
- final String failureReason =
latestParseFailureDescription.get(topic);
- final String description = "Failed to parse " + failures + "
out of " + records + " records. Sample failure reason: " + failureReason;
- failureDescriptions.put(topic, description);
- }
+ }
- return new ConfigVerificationResult.Builder()
- .verificationStepName("Parse Records")
- .outcome(Outcome.FAILED)
- .explanation("With the configured Record Reader, failed to
parse at least one Record. Failures per topic: " + failureDescriptions)
- .build();
+ final Map<String, String> failureDescriptions = new HashMap<>();
+ for (final String topic : recordsPerTopic.keySet()) {
+ final int records = recordsPerTopic.get(topic);
+ final Integer failures = parseFailuresPerTopic.get(topic);
+ final String failureReason =
latestParseFailureDescription.get(topic);
+ final String description = "Failed to parse " + failures + " out
of " + records + " records. Sample failure reason: " + failureReason;
+ failureDescriptions.put(topic, description);
}
+
+ return new ConfigVerificationResult.Builder()
+ .verificationStepName("Parse Records")
+ .outcome(Outcome.FAILED)
+ .explanation("With the configured Record Reader, failed to parse
at least one Record. Failures per topic: " + failureDescriptions)
+ .build();
}
/**