This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 50d26c7dc3 NIFI-14568 Fixed Property Verification in Kafka Connection 
Service This closes #9945
50d26c7dc3 is described below

commit 50d26c7dc39f0bad449f896b24d177b5585b96a0
Author: exceptionfactory <[email protected]>
AuthorDate: Wed May 14 17:13:39 2025 -0500

    NIFI-14568 Fixed Property Verification in Kafka Connection Service
    This closes #9945
    
    - Corrected properties construction for Kafka Admin Client in verify method
    
    Signed-off-by: Joseph Witt <[email protected]>
---
 .../apache/nifi/kafka/service/Kafka3ConnectionService.java   | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index b9ee5ddce5..bed6590ac3 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -250,7 +250,6 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
     private static final String CONNECTION_STEP = "Kafka Broker Connection";
     private static final String TOPIC_LISTING_STEP = "Kafka Topic Listing";
 
-    private volatile Properties clientProperties;
     private volatile ServiceConfiguration serviceConfiguration;
     private volatile Properties producerProperties;
     private volatile Properties consumerProperties;
@@ -258,7 +257,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
 
     @OnEnabled
     public void onEnabled(final ConfigurationContext configurationContext) {
-        clientProperties = getClientProperties(configurationContext);
+        final Properties clientProperties = 
getClientProperties(configurationContext);
         serviceConfiguration = getServiceConfiguration(configurationContext);
         producerProperties = getProducerProperties(configurationContext, 
clientProperties);
         consumerProperties = getConsumerProperties(configurationContext, 
clientProperties);
@@ -312,8 +311,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
         final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
         final Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(properties, deserializer, deserializer);
 
-        final Kafka3ConsumerService consumerService = new 
Kafka3ConsumerService(getLogger(), consumer, subscription);
-        return consumerService;
+        return new Kafka3ConsumerService(getLogger(), consumer, subscription);
     }
 
     private Subscription createSubscription(final PollingContext 
pollingContext) {
@@ -360,9 +358,11 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
     public List<ConfigVerificationResult> verify(final ConfigurationContext 
configurationContext, final ComponentLog verificationLogger, final Map<String, 
String> variables) {
         final List<ConfigVerificationResult> results = new ArrayList<>();
 
+        // Build Admin Client Properties based on configured values and 
defaults from Consumer Properties
         final Properties clientProperties = 
getClientProperties(configurationContext);
-        clientProperties.putAll(variables);
-        try (final Admin admin = Admin.create(clientProperties)) {
+        final Properties consumerProperties = 
getConsumerProperties(configurationContext, clientProperties);
+        consumerProperties.putAll(variables);
+        try (final Admin admin = Admin.create(consumerProperties)) {
             final ListTopicsResult listTopicsResult = admin.listTopics();
 
             final KafkaFuture<Collection<TopicListing>> requestedListings = 
listTopicsResult.listings();

Reply via email to