This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0aa100a0a5 NIFI-14059 - add properties needed to use Kafka 
authentication mechanism SASL_SSL
0aa100a0a5 is described below

commit 0aa100a0a5dc212be72595e396bd159b6d453d29
Author: Paul Grey <[email protected]>
AuthorDate: Tue Dec 31 15:17:12 2024 -0500

    NIFI-14059 - add properties needed to use Kafka authentication mechanism 
SASL_SSL
    
    NIFI-14059 - Kafka/KerberosUserService compatibility
    
    Signed-off-by: Matt Burgess <[email protected]>
    
    This closes #9607
---
 .../nifi/kafka/service/Kafka3ConnectionService.java       | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
index a212709407..72f336d21c 100644
--- 
a/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
+++ 
b/nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java
@@ -53,6 +53,7 @@ import org.apache.nifi.kafka.shared.property.SaslMechanism;
 import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
 import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
 import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
@@ -70,6 +71,7 @@ import java.util.regex.Pattern;
 
 import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
 import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+import static 
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
 import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE;
@@ -146,6 +148,14 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
             )
             .build();
 
+    public static final PropertyDescriptor 
SELF_CONTAINED_KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder()
+            .name("kerberos-user-service")
+            .displayName("Kerberos User Service")
+            .description("Service supporting user authentication with 
Kerberos")
+            
.identifiesControllerService(SelfContainedKerberosUserService.class)
+            .required(false)
+            .build();
+
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("SSL Context Service")
             .description("Service supporting SSL communication with Kafka 
brokers")
@@ -220,6 +230,8 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
             SASL_MECHANISM,
             SASL_USERNAME,
             SASL_PASSWORD,
+            SELF_CONTAINED_KERBEROS_USER_SERVICE,
+            KERBEROS_SERVICE_NAME,
             SSL_CONTEXT_SERVICE,
             TRANSACTION_ISOLATION_LEVEL,
             MAX_POLL_RECORDS,
@@ -272,8 +284,7 @@ public class Kafka3ConnectionService extends 
AbstractControllerService implement
             consumer.subscribe(topics);
         }
 
-        final Kafka3ConsumerService consumerService = new 
Kafka3ConsumerService(getLogger(), consumer, subscription, 
pollingContext.getMaxUncommittedTime());
-        return consumerService;
+        return new Kafka3ConsumerService(getLogger(), consumer, subscription, 
pollingContext.getMaxUncommittedTime());
     }
 
     private Subscription createSubscription(final PollingContext 
pollingContext) {

Reply via email to