This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e6229ab  NIFI-9771: When a Kafka record is obtained during config 
verification, we should produce an invalid response if the Record Reader is not 
able to produce any records from it
e6229ab is described below

commit e6229ab938571f5339805797b6a6ffeebcfad652
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 7 11:54:37 2022 -0500

    NIFI-9771: When a Kafka record is obtained during config verification, we 
should produce an invalid response if the Record Reader is not able to produce 
any records from it
    
    Signed-off-by: Joe Gresock <[email protected]>
    
    This closes #5847.
---
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 57 +++++++++++++---------
 1 file changed, 33 insertions(+), 24 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 6c4cc49..e8e0311 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -24,9 +24,9 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.serialization.RecordReader;
@@ -403,52 +403,61 @@ public class ConsumerPool implements Closeable {
         final Map<String, Integer> recordsPerTopic = new HashMap<>();
 
         for (final ConsumerRecord<byte[], byte[]> consumerRecord : 
consumerRecords) {
-            recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
             final Map<String, String> attributes = 
consumerLease.getAttributes(consumerRecord);
 
+            int numRecords = 0;
             final byte[] recordBytes = consumerRecord.value() == null ? new 
byte[0] : consumerRecord.value();
             try (final InputStream in = new ByteArrayInputStream(recordBytes)) 
{
                 final RecordReader reader = 
readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
                 while (reader.nextRecord() != null) {
+                    numRecords++;
                 }
             } catch (final Exception e) {
                 parseFailuresPerTopic.merge(consumerRecord.topic(), 1, 
Integer::sum);
                 latestParseFailureDescription.put(consumerRecord.topic(), 
e.toString());
             }
+
+            if (numRecords == 0) {
+                parseFailuresPerTopic.merge(consumerRecord.topic(), 1, 
Integer::sum);
+                latestParseFailureDescription.put(consumerRecord.topic(), 
"Received Kafka message but Record Reader produced no Record from it");
+                recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
+            } else {
+                recordsPerTopic.merge(consumerRecord.topic(), numRecords, 
Integer::sum);
+            }
         }
 
         // Note here that we do not commit the offsets. We will just let the 
consumer close without committing the offsets, which
         // will roll back the consumption of the messages.
-        if (recordsPerTopic.isEmpty()) {
-            return new ConfigVerificationResult.Builder()
-                .verificationStepName("Parse Records")
-                .outcome(Outcome.SKIPPED)
-                .explanation("Received no messages to attempt parsing within 
the 30 second timeout")
-                .build();
-        }
-
         if (parseFailuresPerTopic.isEmpty()) {
+            if (recordsPerTopic.isEmpty()) {
+                return new ConfigVerificationResult.Builder()
+                    .verificationStepName("Parse Records")
+                    .outcome(Outcome.SKIPPED)
+                    .explanation("Received no messages to attempt parsing 
within the 30 second timeout")
+                    .build();
+            }
+
             return new ConfigVerificationResult.Builder()
                 .verificationStepName("Parse Records")
                 .outcome(Outcome.SUCCESSFUL)
                 .explanation("Was able to parse all Records consumed from 
topics. Number of Records consumed from each topic: " + recordsPerTopic)
                 .build();
-        } else {
-            final Map<String, String> failureDescriptions = new HashMap<>();
-            for (final String topic : recordsPerTopic.keySet()) {
-                final int records = recordsPerTopic.get(topic);
-                final Integer failures = parseFailuresPerTopic.get(topic);
-                final String failureReason = 
latestParseFailureDescription.get(topic);
-                final String description = "Failed to parse " + failures + " 
out of " + records + " records. Sample failure reason: " + failureReason;
-                failureDescriptions.put(topic, description);
-            }
+        }
 
-            return new ConfigVerificationResult.Builder()
-                .verificationStepName("Parse Records")
-                .outcome(Outcome.FAILED)
-                .explanation("With the configured Record Reader, failed to 
parse at least one Record. Failures per topic: " + failureDescriptions)
-                .build();
+        final Map<String, String> failureDescriptions = new HashMap<>();
+        for (final String topic : recordsPerTopic.keySet()) {
+            final int records = recordsPerTopic.get(topic);
+            final Integer failures = parseFailuresPerTopic.get(topic);
+            final String failureReason = 
latestParseFailureDescription.get(topic);
+            final String description = "Failed to parse " + failures + " out 
of " + records + " records. Sample failure reason: " + failureReason;
+            failureDescriptions.put(topic, description);
         }
+
+        return new ConfigVerificationResult.Builder()
+            .verificationStepName("Parse Records")
+            .outcome(Outcome.FAILED)
+            .explanation("With the configured Record Reader, failed to parse 
at least one Record. Failures per topic: " + failureDescriptions)
+            .build();
     }
 
     /**

Reply via email to