This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 50d26c7dc3 NIFI-14568 Fixed Property Verification in Kafka Connection
Service This closes #9945
50d26c7dc3 is described below
commit 50d26c7dc39f0bad449f896b24d177b5585b96a0
Author: exceptionfactory <[email protected]>
AuthorDate: Wed May 14 17:13:39 2025 -0500
NIFI-14568 Fixed Property Verification in Kafka Connection Service
This closes #9945
- Corrected properties construction for Kafka Admin Client in verify method
Signed-off-by: Joseph Witt <[email protected]>
---
.../apache/nifi/kafka/service/Kafka3ConnectionService.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index b9ee5ddce5..bed6590ac3 100644
---
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -250,7 +250,6 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
private static final String CONNECTION_STEP = "Kafka Broker Connection";
private static final String TOPIC_LISTING_STEP = "Kafka Topic Listing";
- private volatile Properties clientProperties;
private volatile ServiceConfiguration serviceConfiguration;
private volatile Properties producerProperties;
private volatile Properties consumerProperties;
@@ -258,7 +257,7 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
@OnEnabled
public void onEnabled(final ConfigurationContext configurationContext) {
- clientProperties = getClientProperties(configurationContext);
+ final Properties clientProperties =
getClientProperties(configurationContext);
serviceConfiguration = getServiceConfiguration(configurationContext);
producerProperties = getProducerProperties(configurationContext,
clientProperties);
consumerProperties = getConsumerProperties(configurationContext,
clientProperties);
@@ -312,8 +311,7 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
final Consumer<byte[], byte[]> consumer = new
KafkaConsumer<>(properties, deserializer, deserializer);
- final Kafka3ConsumerService consumerService = new
Kafka3ConsumerService(getLogger(), consumer, subscription);
- return consumerService;
+ return new Kafka3ConsumerService(getLogger(), consumer, subscription);
}
private Subscription createSubscription(final PollingContext
pollingContext) {
@@ -360,9 +358,11 @@ public class Kafka3ConnectionService extends
AbstractControllerService implement
public List<ConfigVerificationResult> verify(final ConfigurationContext
configurationContext, final ComponentLog verificationLogger, final Map<String,
String> variables) {
final List<ConfigVerificationResult> results = new ArrayList<>();
+ // Build Admin Client Properties based on configured values and
defaults from Consumer Properties
final Properties clientProperties =
getClientProperties(configurationContext);
- clientProperties.putAll(variables);
- try (final Admin admin = Admin.create(clientProperties)) {
+ final Properties consumerProperties =
getConsumerProperties(configurationContext, clientProperties);
+ consumerProperties.putAll(variables);
+ try (final Admin admin = Admin.create(consumerProperties)) {
final ListTopicsResult listTopicsResult = admin.listTopics();
final KafkaFuture<Collection<TopicListing>> requestedListings =
listTopicsResult.listings();