This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f90a92146d NIFI-15286 Added connection verification to Kafka 
Connection Services (#10689)
f90a92146d is described below

commit f90a92146d4d018611a0879f97f019ad9b1dd5ed
Author: David Handermann <[email protected]>
AuthorDate: Tue Dec 23 21:41:47 2025 -0600

    NIFI-15286 Added connection verification to Kafka Connection Services 
(#10689)
    
    Co-authored-by: Dariusz Seweryn <[email protected]>
---
 .../kafka/service/Kafka3ConnectionService.java     |  41 +--
 .../kafka/service/KafkaConnectionVerifier.java     | 317 +++++++++++++++++++++
 .../service/Kafka3ConnectionServiceBaseIT.java     |  82 ++++--
 3 files changed, 383 insertions(+), 57 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index b29a444932..72f58cf771 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -16,15 +16,11 @@
  */
 package org.apache.nifi.kafka.service;
 
-import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.ListTopicsResult;
-import org.apache.kafka.clients.admin.TopicListing;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -63,8 +59,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -73,8 +67,6 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
-import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
-import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
 import static 
org.apache.nifi.kafka.service.security.OAuthBearerLoginCallbackHandler.PROPERTY_KEY_NIFI_OAUTH_2_ACCESS_TOKEN_PROVIDER;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
@@ -176,9 +168,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
             ACK_WAIT_TIME
     );
 
-    private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(2);
-    private static final String CONNECTION_STEP = "Kafka Broker Connection";
-    private static final String TOPIC_LISTING_STEP = "Kafka Topic Listing";
+    private static final KafkaConnectionVerifier kafkaConnectionVerifier = new 
KafkaConnectionVerifier();
 
     private volatile ServiceConfiguration serviceConfiguration;
     private volatile Properties producerProperties;
@@ -303,37 +293,12 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
 
     @Override
     public List<ConfigVerificationResult> verify(final ConfigurationContext 
configurationContext, final ComponentLog verificationLogger, final Map<String, 
String> variables) {
-        final List<ConfigVerificationResult> results = new ArrayList<>();
-
-        // Build Admin Client Properties based on configured values and 
defaults from Consumer Properties
+        // Build Client Properties based on configured values and defaults 
from Consumer Properties
         final Properties clientProperties = 
getClientProperties(configurationContext);
         final Properties consumerProperties = 
getConsumerProperties(configurationContext, clientProperties);
         consumerProperties.putAll(variables);
-        try (final Admin admin = Admin.create(consumerProperties)) {
-            final ListTopicsResult listTopicsResult = admin.listTopics();
-
-            final KafkaFuture<Collection<TopicListing>> requestedListings = 
listTopicsResult.listings();
-            final Collection<TopicListing> topicListings = 
requestedListings.get(VERIFY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
-            final String topicListingExplanation = String.format("Topics Found 
[%d]", topicListings.size());
-            results.add(
-                    new ConfigVerificationResult.Builder()
-                            .verificationStepName(TOPIC_LISTING_STEP)
-                            .outcome(SUCCESSFUL)
-                            .explanation(topicListingExplanation)
-                            .build()
-            );
-        } catch (final Exception e) {
-            verificationLogger.error("Kafka Broker verification failed", e);
-            results.add(
-                    new ConfigVerificationResult.Builder()
-                            .verificationStepName(CONNECTION_STEP)
-                            .outcome(FAILED)
-                            .explanation(e.getMessage())
-                            .build()
-            );
-        }
 
-        return results;
+        return kafkaConnectionVerifier.verify(verificationLogger, 
consumerProperties);
     }
 
     protected Properties getProducerProperties(final PropertyContext 
propertyContext, final Properties defaultProperties) {
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaConnectionVerifier.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaConnectionVerifier.java
new file mode 100644
index 0000000000..06b9cb1665
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/KafkaConnectionVerifier.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service;
+
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.DescribeClusterOptions;
+import org.apache.kafka.clients.admin.DescribeClusterResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
+import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SKIPPED;
+import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+
+class KafkaConnectionVerifier {
+    static final String ADDRESSES_STEP = "Broker Addresses";
+    static final String CONFIGURATION_STEP = "Broker Configuration";
+    static final String BROKER_CONNECTION_STEP = "Broker Connection";
+    static final String NODE_CONNECTION_STEP = "Node Connection";
+    static final String CLUSTER_DESCRIPTION_STEP = "Cluster Description";
+    static final String CLUSTER_CONNECTION_STEP = "Cluster Connection";
+    static final String TOPIC_LISTING_STEP = "Topic Listing";
+
+    private static final int SOCKET_CONNECT_TIMEOUT = 2500;
+    private static final int VERIFY_TIMEOUT = 5000;
+    private static final TimeUnit VERIFY_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+    /**
+     * Verify Connection to Kafka Brokers
+     *
+     * @param verificationLogger Logger specific to verification warnings and 
errors
+     * @param resolvedProperties Resolved properties from Controller Service 
implementation containing required values for communicating with Kafka Brokers
+     *
+     * @return Verification Results
+     */
+    List<ConfigVerificationResult> verify(final ComponentLog 
verificationLogger, final Properties resolvedProperties) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        // Get validated Bootstrap Addresses before checking socket connections
+        final List<InetSocketAddress> bootstrapAddresses = 
getBootstrapAddresses(verificationLogger, resolvedProperties, results);
+        if (bootstrapAddresses.isEmpty()) {
+            results.add(
+                    new ConfigVerificationResult.Builder()
+                            .verificationStepName(CONFIGURATION_STEP)
+                            .outcome(FAILED)
+                            .explanation("Validated Bootstrap Servers not 
found")
+                            .build()
+            );
+        } else {
+            // Get connected Bootstrap Addresses before checking Topics
+            final List<InetSocketAddress> connectedAddresses = 
getConnectedAddresses(verificationLogger, results, bootstrapAddresses);
+            if (connectedAddresses.isEmpty()) {
+                results.add(
+                        new ConfigVerificationResult.Builder()
+                                .verificationStepName(BROKER_CONNECTION_STEP)
+                                .outcome(FAILED)
+                                .explanation("Connected Bootstrap Servers not 
found")
+                                .build()
+                );
+            } else {
+                try (final Admin admin = Admin.create(resolvedProperties)) {
+                    final List<Node> clusterNodes = 
getClusterNodes(verificationLogger, admin, results);
+                    if (clusterNodes.isEmpty()) {
+                        verificationLogger.info("Cluster Nodes not found");
+                    } else {
+                        verifyClusterNodes(verificationLogger, 
connectedAddresses, clusterNodes, results);
+                    }
+
+                    // Verify Topics regardless of Cluster Node status because 
Describe Cluster is independent of listing Topics
+                    verifyTopics(verificationLogger, admin, results);
+                }
+            }
+        }
+
+        return results;
+    }
+
+    private List<InetSocketAddress> getBootstrapAddresses(final ComponentLog 
verificationLogger, final Properties resolvedProperties, final 
List<ConfigVerificationResult> results) {
+        final List<InetSocketAddress> bootstrapAddresses = new ArrayList<>();
+
+        try {
+            final AdminClientConfig adminClientConfig = new 
AdminClientConfig(resolvedProperties);
+            final List<InetSocketAddress> validatedAddresses = 
ClientUtils.parseAndValidateAddresses(adminClientConfig);
+            bootstrapAddresses.addAll(validatedAddresses);
+
+            results.add(
+                    new ConfigVerificationResult.Builder()
+                            .verificationStepName(ADDRESSES_STEP)
+                            .outcome(SUCCESSFUL)
+                            .explanation("Addresses validated 
[%d]".formatted(validatedAddresses.size()))
+                            .build()
+            );
+        } catch (final Exception e) {
+            verificationLogger.error("Broker Address verification failed", e);
+            results.add(
+                    new ConfigVerificationResult.Builder()
+                            .verificationStepName(ADDRESSES_STEP)
+                            .outcome(FAILED)
+                            .explanation(e.getMessage())
+                            .build()
+            );
+        }
+
+        return bootstrapAddresses;
+    }
+
+    private List<InetSocketAddress> getConnectedAddresses(final ComponentLog 
verificationLogger, final List<ConfigVerificationResult> results, final 
List<InetSocketAddress> bootstrapAddresses) {
+        final List<InetSocketAddress> connectedAddresses = 
Collections.synchronizedList(new ArrayList<>());
+
+        try (ExecutorService executorService = 
Executors.newVirtualThreadPerTaskExecutor()) {
+            final List<Callable<ConfigVerificationResult>> 
socketConnectionTasks = getConnectionTasks(bootstrapAddresses, 
connectedAddresses);
+
+            final List<Future<ConfigVerificationResult>> futures = 
executorService.invokeAll(socketConnectionTasks, VERIFY_TIMEOUT, 
VERIFY_TIMEOUT_UNIT);
+            for (final Future<ConfigVerificationResult> future : futures) {
+                final ConfigVerificationResult result = future.get();
+                results.add(result);
+            }
+        } catch (final Exception e) {
+            verificationLogger.error("Broker Connection verification failed", 
e);
+            results.add(
+                    new ConfigVerificationResult.Builder()
+                            .verificationStepName(BROKER_CONNECTION_STEP)
+                            .outcome(FAILED)
+                            .explanation(e.getMessage())
+                            .build()
+            );
+        }
+
+        return connectedAddresses;
+    }
+
+    private List<Callable<ConfigVerificationResult>> getConnectionTasks(final 
List<InetSocketAddress> addresses, final List<InetSocketAddress> 
connectedAddresses) {
+        final List<Callable<ConfigVerificationResult>> socketConnectionTasks = 
new ArrayList<>();
+        for (final InetSocketAddress address : addresses) {
+            final Callable<ConfigVerificationResult> socketConnectionTask = () 
-> {
+                final ConfigVerificationResult result = 
verifySocketConnection(address);
+                final ConfigVerificationResult.Outcome outcome = 
result.getOutcome();
+                if (SUCCESSFUL == outcome) {
+                    connectedAddresses.add(address);
+                }
+                return result;
+            };
+            socketConnectionTasks.add(socketConnectionTask);
+        }
+        return socketConnectionTasks;
+    }
+
+    private List<Node> getClusterNodes(final ComponentLog verificationLogger, 
final Admin admin, final List<ConfigVerificationResult> results) {
+        final List<Node> clusterNodes = new ArrayList<>();
+
+        final DescribeClusterOptions describeClusterOptions = new 
DescribeClusterOptions().timeoutMs(VERIFY_TIMEOUT);
+        try {
+            final DescribeClusterResult describeClusterResult = 
admin.describeCluster(describeClusterOptions);
+            final KafkaFuture<Collection<Node>> resultNodesFuture = 
describeClusterResult.nodes();
+            final Collection<Node> resultNodes = 
resultNodesFuture.get(VERIFY_TIMEOUT, VERIFY_TIMEOUT_UNIT);
+            clusterNodes.addAll(resultNodes);
+
+            results.add(
+                    new ConfigVerificationResult.Builder()
+                            .verificationStepName(CLUSTER_DESCRIPTION_STEP)
+                            .outcome(SUCCESSFUL)
+                            .explanation("Cluster Nodes found 
[%d]".formatted(resultNodes.size()))
+                            .build()
+            );
+        } catch (final Exception e) {
+            if (hasAuthorizationException(e)) {
+                verificationLogger.warn("Describe Cluster authorization 
failed", e);
+                results.add(
+                        new ConfigVerificationResult.Builder()
+                                .verificationStepName(CLUSTER_DESCRIPTION_STEP)
+                                .outcome(SKIPPED)
+                                .explanation(e.getMessage())
+                                .build()
+                );
+            } else {
+                verificationLogger.warn("Describe Cluster failed", e);
+                results.add(
+                        new ConfigVerificationResult.Builder()
+                                .verificationStepName(CLUSTER_DESCRIPTION_STEP)
+                                .outcome(FAILED)
+                                .explanation(e.getMessage())
+                                .build()
+                );
+            }
+        }
+
+        return clusterNodes;
+    }
+
+    private ConfigVerificationResult verifySocketConnection(final 
InetSocketAddress socketAddress) {
+        final ConfigVerificationResult.Builder builder = new 
ConfigVerificationResult.Builder().verificationStepName(NODE_CONNECTION_STEP);
+
+        try (Socket socket = new Socket()) {
+            socket.connect(socketAddress, SOCKET_CONNECT_TIMEOUT);
+            builder.outcome(SUCCESSFUL);
+            builder.explanation("[%s:%d] socket 
connected".formatted(socketAddress.getHostString(), socketAddress.getPort()));
+        } catch (final IOException e) {
+            builder.outcome(FAILED);
+            builder.explanation("[%s:%d] connection 
failed".formatted(socketAddress.getHostString(), socketAddress.getPort()));
+        }
+
+        return builder.build();
+    }
+
+    private boolean hasAuthorizationException(final Exception e) {
+        final Throwable cause;
+
+        if (e instanceof ExecutionException executionException) {
+            cause = executionException.getCause();
+        } else {
+            cause = e.getCause();
+        }
+
+        return cause instanceof AuthorizationException;
+    }
+
+    private void verifyClusterNodes(
+            final ComponentLog verificationLogger,
+            final List<InetSocketAddress> connectedAddresses,
+            final List<Node> clusterNodes,
+            final List<ConfigVerificationResult> results
+    ) {
+        // Build list of Cluster Node Addresses not found in connected 
Bootstrap Addresses
+        final List<InetSocketAddress> nodeAddresses = new ArrayList<>();
+        for (final Node clusterNode : clusterNodes) {
+            final InetSocketAddress nodeAddress = new 
InetSocketAddress(clusterNode.host(), clusterNode.port());
+            if (connectedAddresses.contains(nodeAddress)) {
+                continue;
+            }
+            nodeAddresses.add(nodeAddress);
+        }
+        if (nodeAddresses.isEmpty()) {
+            verificationLogger.info("Cluster Nodes match connected Bootstrap 
Servers");
+        } else {
+            final List<InetSocketAddress> connectedNodeAddresses = 
getConnectedAddresses(verificationLogger, results, nodeAddresses);
+            if (connectedNodeAddresses.isEmpty()) {
+                results.add(
+                        new ConfigVerificationResult.Builder()
+                                .verificationStepName(CLUSTER_CONNECTION_STEP)
+                                .outcome(FAILED)
+                                .explanation("Connected Cluster Nodes not 
found")
+                                .build()
+                );
+            } else {
+                results.add(
+                        new ConfigVerificationResult.Builder()
+                                .verificationStepName(CLUSTER_CONNECTION_STEP)
+                                .outcome(SUCCESSFUL)
+                                .explanation("Connected Cluster Nodes found 
[%d]".formatted(clusterNodes.size()))
+                                .build()
+                );
+            }
+        }
+    }
+
+    private void verifyTopics(final ComponentLog verificationLogger, final 
Admin admin, final List<ConfigVerificationResult> results) {
+        try {
+            final ListTopicsResult listTopicsResult = admin.listTopics();
+            final KafkaFuture<Collection<TopicListing>> requestedListings = 
listTopicsResult.listings();
+            final Collection<TopicListing> topicListings = 
requestedListings.get(VERIFY_TIMEOUT, VERIFY_TIMEOUT_UNIT);
+
+            final String topicListingExplanation = "Topics found 
[%d]".formatted(topicListings.size());
+            results.add(
+                    new ConfigVerificationResult.Builder()
+                            .verificationStepName(TOPIC_LISTING_STEP)
+                            .outcome(SUCCESSFUL)
+                            .explanation(topicListingExplanation)
+                            .build()
+            );
+        } catch (final Exception e) {
+            verificationLogger.error("Topic Listing verification failed", e);
+            results.add(
+                    new ConfigVerificationResult.Builder()
+                            .verificationStepName(TOPIC_LISTING_STEP)
+                            .outcome(FAILED)
+                            .explanation(e.getMessage())
+                            .build()
+            );
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
index d466580723..3b31c9c69a 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/test/java/org/apache/nifi/kafka/service/Kafka3ConnectionServiceBaseIT.java
@@ -288,36 +288,80 @@ public class Kafka3ConnectionServiceBaseIT {
 
     @Test
     void testVerifySuccessful() {
-        final Map<PropertyDescriptor, String> properties = new 
LinkedHashMap<>();
-        properties.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS, 
kafkaContainer.getBootstrapServers());
+        final String bootstrapServers = kafkaContainer.getBootstrapServers();
+        final Map<PropertyDescriptor, String> properties = Map.of(
+                Kafka3ConnectionService.BOOTSTRAP_SERVERS, bootstrapServers,
+                Kafka3ConnectionService.CLIENT_TIMEOUT, CLIENT_TIMEOUT
+        );
         final MockConfigurationContext configurationContext = new 
MockConfigurationContext(properties, null, null);
 
-        final List<ConfigVerificationResult> results = service.verify(
-                configurationContext, runner.getLogger(), 
getAdminClientConfigProperties());
+        final List<ConfigVerificationResult> results = 
service.verify(configurationContext, runner.getLogger(), 
getAdminClientConfigProperties());
 
-        assertFalse(results.isEmpty());
+        assertEquals(4, results.size());
+        assertResultFound(results, KafkaConnectionVerifier.ADDRESSES_STEP, 
ConfigVerificationResult.Outcome.SUCCESSFUL);
+        assertResultFound(results, 
KafkaConnectionVerifier.NODE_CONNECTION_STEP, 
ConfigVerificationResult.Outcome.SUCCESSFUL, bootstrapServers);
+        assertResultFound(results, 
KafkaConnectionVerifier.CLUSTER_DESCRIPTION_STEP, 
ConfigVerificationResult.Outcome.SUCCESSFUL);
+        assertResultFound(results, KafkaConnectionVerifier.TOPIC_LISTING_STEP, 
ConfigVerificationResult.Outcome.SUCCESSFUL);
+    }
+
+    @Test
+    void testVerifyAddressesFailed() {
+        final String bootstrapServers = "127.0.0.1:707070";
+        final Map<PropertyDescriptor, String> properties = Map.of(
+                Kafka3ConnectionService.BOOTSTRAP_SERVERS, bootstrapServers,
+                Kafka3ConnectionService.CLIENT_TIMEOUT, CLIENT_TIMEOUT
+        );
+        final MockConfigurationContext configurationContext = new 
MockConfigurationContext(properties, null, null);
 
-        final ConfigVerificationResult firstResult = results.iterator().next();
-        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
firstResult.getOutcome());
-        assertNotNull(firstResult.getExplanation());
+        final List<ConfigVerificationResult> results = 
service.verify(configurationContext, runner.getLogger(), 
getAdminClientConfigProperties());
+
+        assertEquals(2, results.size());
+        assertResultFound(results, KafkaConnectionVerifier.ADDRESSES_STEP, 
ConfigVerificationResult.Outcome.FAILED, bootstrapServers);
+        assertResultFound(results, KafkaConnectionVerifier.CONFIGURATION_STEP, 
ConfigVerificationResult.Outcome.FAILED);
     }
 
     @Test
-    void testVerifyFailed() {
-        final Map<PropertyDescriptor, String> properties = new 
LinkedHashMap<>();
-        properties.put(Kafka3ConnectionService.BOOTSTRAP_SERVERS, 
UNREACHABLE_BOOTSTRAP_SERVERS);
-        properties.put(Kafka3ConnectionService.CLIENT_TIMEOUT, CLIENT_TIMEOUT);
+    void testVerifyConnectionFailed() {
+        final Map<PropertyDescriptor, String> properties = Map.of(
+                Kafka3ConnectionService.BOOTSTRAP_SERVERS, 
UNREACHABLE_BOOTSTRAP_SERVERS,
+                Kafka3ConnectionService.CLIENT_TIMEOUT, CLIENT_TIMEOUT
+        );
+        final MockConfigurationContext configurationContext = new 
MockConfigurationContext(properties, null, null);
 
-        final MockConfigurationContext configurationContext = new 
MockConfigurationContext(
-                properties, null, null);
+        final List<ConfigVerificationResult> results = 
service.verify(configurationContext, runner.getLogger(), 
getAdminClientConfigProperties());
 
-        final List<ConfigVerificationResult> results = service.verify(
-                configurationContext, runner.getLogger(), 
getAdminClientConfigProperties());
+        assertEquals(3, results.size());
+        assertResultFound(results, KafkaConnectionVerifier.ADDRESSES_STEP, 
ConfigVerificationResult.Outcome.SUCCESSFUL);
+        assertResultFound(results, 
KafkaConnectionVerifier.NODE_CONNECTION_STEP, 
ConfigVerificationResult.Outcome.FAILED, UNREACHABLE_BOOTSTRAP_SERVERS);
+        assertResultFound(results, 
KafkaConnectionVerifier.BROKER_CONNECTION_STEP, 
ConfigVerificationResult.Outcome.FAILED);
+    }
 
-        assertFalse(results.isEmpty());
+    private void assertResultFound(
+            final List<ConfigVerificationResult> results,
+            final String verifiedStepName,
+            final ConfigVerificationResult.Outcome outcome
+    ) {
+        final Optional<ConfigVerificationResult> resultFound = results.stream()
+                .filter(result -> 
result.getVerificationStepName().contains(verifiedStepName))
+                .filter(result -> result.getOutcome().equals(outcome))
+                .findFirst();
+        assertTrue(resultFound.isPresent(), "Result Step [%s] with Outcome 
[%s] not found".formatted(verifiedStepName, outcome));
+    }
 
-        final ConfigVerificationResult firstResult = results.iterator().next();
-        assertEquals(ConfigVerificationResult.Outcome.FAILED, 
firstResult.getOutcome());
+    private void assertResultFound(
+            final List<ConfigVerificationResult> results,
+            final String verifiedStepName,
+            final ConfigVerificationResult.Outcome outcome,
+            final String bootstrapServers
+    ) {
+        final Optional<ConfigVerificationResult> bootstrapServerResultFound = 
results.stream()
+                .filter(result -> 
result.getExplanation().contains(bootstrapServers))
+                .filter(result -> result.getOutcome().equals(outcome))
+                .findFirst();
+        assertTrue(bootstrapServerResultFound.isPresent(), "Result Step not 
found referencing Bootstrap Servers [%s]".formatted(bootstrapServers));
+
+        final ConfigVerificationResult bootstrapServerResult = 
bootstrapServerResultFound.get();
+        assertEquals(verifiedStepName, 
bootstrapServerResult.getVerificationStepName());
     }
 
     @Test

Reply via email to