This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ed1cee4520 NIFI-11202 Removed deprecated Kerberos properties from
Kafka 2_6
ed1cee4520 is described below
commit ed1cee4520fc47c01bc19abc8bde81bf5c30999d
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Mon Feb 20 00:00:42 2023 +0100
NIFI-11202 Removed deprecated Kerberos properties from Kafka 2_6
This closes #6978
Signed-off-by: David Handermann <[email protected]>
---
.../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 4 +-
.../processors/kafka/pubsub/ConsumeKafka_2_6.java | 3 -
.../kafka/pubsub/PublishKafkaRecord_2_6.java | 3 -
.../processors/kafka/pubsub/PublishKafka_2_6.java | 3 -
.../record/sink/kafka/KafkaRecordSink_2_6.java | 2 +-
.../kafka/pubsub/TestConsumeKafkaRecord_2_6.java | 23 +++---
.../kafka/pubsub/TestConsumeKafka_2_6.java | 40 +---------
.../record/sink/kafka/TestKafkaRecordSink_2_6.java | 4 -
.../shared/component/KafkaClientComponent.java | 29 -------
.../KerberosCredentialsLoginConfigProvider.java | 54 -------------
.../KerberosDelegatingLoginConfigProvider.java | 15 +---
.../KafkaClientCustomValidationFunction.java | 89 +---------------------
.../KafkaClientCustomValidationFunctionTest.java | 33 --------
13 files changed, 20 insertions(+), 282 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 025a830552..1fd4963413 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -313,10 +313,8 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor implements KafkaCl
descriptors.add(HONOR_TRANSACTIONS);
descriptors.add(SECURITY_PROTOCOL);
descriptors.add(SASL_MECHANISM);
- descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
+ descriptors.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
descriptors.add(KERBEROS_SERVICE_NAME);
- descriptors.add(KERBEROS_PRINCIPAL);
- descriptors.add(KERBEROS_KEYTAB);
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 076cd474bb..2ac988b291 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -266,11 +266,8 @@ public class ConsumeKafka_2_6 extends AbstractProcessor
implements KafkaClientCo
descriptors.add(SEPARATE_BY_KEY);
descriptors.add(SECURITY_PROTOCOL);
descriptors.add(SASL_MECHANISM);
- descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
descriptors.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
descriptors.add(KERBEROS_SERVICE_NAME);
- descriptors.add(KERBEROS_PRINCIPAL);
- descriptors.add(KERBEROS_KEYTAB);
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index 8b886584c5..b603269de2 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -342,11 +342,8 @@ public class PublishKafkaRecord_2_6 extends
AbstractProcessor implements KafkaPu
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(SECURITY_PROTOCOL);
properties.add(SASL_MECHANISM);
- properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
properties.add(KERBEROS_SERVICE_NAME);
- properties.add(KERBEROS_PRINCIPAL);
- properties.add(KERBEROS_KEYTAB);
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
properties.add(TOKEN_AUTHENTICATION);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index fb7e92c725..c3d7dc9cb9 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -299,11 +299,8 @@ public class PublishKafka_2_6 extends AbstractProcessor
implements KafkaPublishC
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(SECURITY_PROTOCOL);
properties.add(SASL_MECHANISM);
- properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
properties.add(KERBEROS_SERVICE_NAME);
- properties.add(KERBEROS_PRINCIPAL);
- properties.add(KERBEROS_KEYTAB);
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
properties.add(AWS_PROFILE_NAME);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
index a71ef6218e..9a4e4c4cc5 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_6.java
@@ -170,7 +170,7 @@ public class KafkaRecordSink_2_6 extends
AbstractControllerService implements Ka
properties.add(DELIVERY_GUARANTEE);
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(SECURITY_PROTOCOL);
- properties.add(KERBEROS_CREDENTIALS_SERVICE);
+ properties.add(SELF_CONTAINED_KERBEROS_USER_SERVICE);
properties.add(KERBEROS_SERVICE_NAME);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(MAX_REQUEST_SIZE);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_6.java
index 677f96a503..4016ee1b32 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_6.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import org.apache.nifi.kerberos.KerberosUserService;
+import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -159,7 +161,7 @@ public class TestConsumeKafkaRecord_2_6 {
}
@Test
- public void validateGetErrorMessages() throws Exception {
+ public void validateGetErrorMessages() {
String groupName = "validateGetErrorMessages";
when(mockConsumerPool.obtainConsumer(any(),
any())).thenReturn(mockLease);
@@ -181,7 +183,7 @@ public class TestConsumeKafkaRecord_2_6 {
}
@Test
- public void testJaasConfigurationWithDefaultMechanism() {
+ public void testJaasConfigurationWithDefaultMechanism() throws
InitializationException {
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.AUTO_OFFSET_RESET,
ConsumeKafkaRecord_2_6.OFFSET_EARLIEST);
@@ -192,13 +194,8 @@ public class TestConsumeKafkaRecord_2_6 {
runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_SERVICE_NAME,
"kafka");
runner.assertNotValid();
- runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_PRINCIPAL,
"[email protected]");
- runner.assertNotValid();
-
- runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_KEYTAB,
"not.A.File");
- runner.assertNotValid();
-
- runner.setProperty(ConsumeKafkaRecord_2_6.KERBEROS_KEYTAB,
"src/test/resources/server.properties");
+ final KerberosUserService kerberosUserService =
enableKerberosUserService(runner);
+
runner.setProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE,
kerberosUserService.getIdentifier());
runner.assertValid();
}
@@ -278,4 +275,12 @@ public class TestConsumeKafkaRecord_2_6 {
runner.assertValid();
}
+ private SelfContainedKerberosUserService enableKerberosUserService(final
TestRunner runner) throws InitializationException {
+ final SelfContainedKerberosUserService kerberosUserService =
mock(SelfContainedKerberosUserService.class);
+ when(kerberosUserService.getIdentifier()).thenReturn("userService1");
+ runner.addControllerService(kerberosUserService.getIdentifier(),
kerberosUserService);
+ runner.enableControllerService(kerberosUserService);
+ return kerberosUserService;
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
index 31d2d10aaa..d573303f15 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.reporting.InitializationException;
@@ -100,36 +99,12 @@ public class TestConsumeKafka_2_6 {
runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME, "kafka");
runner.assertNotValid();
- runner.setProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL,
"[email protected]");
- runner.assertNotValid();
-
- runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "not.A.File");
- runner.assertNotValid();
-
- runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB,
"src/test/resources/server.properties");
- runner.assertValid();
-
- runner.setVariable("keytab", "src/test/resources/server.properties");
- runner.setVariable("principal", "[email protected]");
- runner.setVariable("service", "kafka");
- runner.setProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL,
"${principal}");
- runner.setProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB, "${keytab}");
- runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME,
"${service}");
- runner.assertValid();
-
final KerberosUserService kerberosUserService =
enableKerberosUserService(runner);
runner.setProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE,
kerberosUserService.getIdentifier());
- runner.assertNotValid();
-
- runner.removeProperty(ConsumeKafka_2_6.KERBEROS_PRINCIPAL);
- runner.removeProperty(ConsumeKafka_2_6.KERBEROS_KEYTAB);
runner.assertValid();
- final KerberosCredentialsService kerberosCredentialsService =
enabledKerberosCredentialsService(runner);
- runner.setProperty(ConsumeKafka_2_6.KERBEROS_CREDENTIALS_SERVICE,
kerberosCredentialsService.getIdentifier());
- runner.assertNotValid();
-
-
runner.removeProperty(ConsumeKafka_2_6.SELF_CONTAINED_KERBEROS_USER_SERVICE);
+ runner.setVariable("service", "kafka");
+ runner.setProperty(ConsumeKafka_2_6.KERBEROS_SERVICE_NAME,
"${service}");
runner.assertValid();
}
@@ -141,15 +116,4 @@ public class TestConsumeKafka_2_6 {
return kerberosUserService;
}
- private KerberosCredentialsService enabledKerberosCredentialsService(final
TestRunner runner) throws InitializationException {
- final KerberosCredentialsService credentialsService =
mock(KerberosCredentialsService.class);
- when(credentialsService.getIdentifier()).thenReturn("credsService1");
- when(credentialsService.getPrincipal()).thenReturn("principal1");
- when(credentialsService.getKeytab()).thenReturn("keytab1");
-
- runner.addControllerService(credentialsService.getIdentifier(),
credentialsService);
- runner.enableControllerService(credentialsService);
- return credentialsService;
- }
-
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
index fe5898620f..241c92db1a 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_6.java
@@ -27,9 +27,7 @@ import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
-import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.record.sink.RecordSinkService;
@@ -163,8 +161,6 @@ public class TestKafkaRecordSink_2_6 {
when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
when(context.getProperty(KafkaRecordSink_2_6.SSL_CONTEXT_SERVICE)).thenReturn(pValue);
when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
-
when(context.getProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE)).thenReturn(pValue);
-
when(pValue.asControllerService(KerberosCredentialsService.class)).thenReturn(null);
final ControllerServiceInitializationContext initContext = new
MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(),
logger, stateManager);
task.initialize(initContext);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index d958997053..5650fab808 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -17,12 +17,9 @@
package org.apache.nifi.kafka.shared.component;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.resource.ResourceCardinality;
-import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
@@ -137,32 +134,6 @@ public interface KafkaClientComponent {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
- .name("sasl.kerberos.principal")
- .displayName("Kerberos Principal")
- .description("Principal used for authentication with Kerberos")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
- .name("sasl.kerberos.keytab")
- .displayName("Kerberos Keytab")
- .description("Keytab credentials used for authentication with
Kerberos")
- .required(false)
- .identifiesExternalResource(ResourceCardinality.SINGLE,
ResourceType.FILE)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
- .name("kerberos-credentials-service")
- .displayName("Kerberos Credentials Service")
- .description("Service supporting generalized credentials
authentication with Kerberos")
- .identifiesControllerService(KerberosCredentialsService.class)
- .required(false)
- .build();
-
PropertyDescriptor SELF_CONTAINED_KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosCredentialsLoginConfigProvider.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosCredentialsLoginConfigProvider.java
deleted file mode 100644
index 27cdf1bec5..0000000000
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosCredentialsLoginConfigProvider.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.kafka.shared.login;
-
-import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-
-/**
- * Kerberos Login Module implementation of configuration provider
- */
-public class KerberosCredentialsLoginConfigProvider implements
LoginConfigProvider {
- private static final String MODULE_CLASS_NAME =
"com.sun.security.auth.module.Krb5LoginModule";
-
- private static final String FORMAT = "%s required renewTicket=true
useKeyTab=true serviceName=\"%s\" principal=\"%s\" keyTab=\"%s\";";
-
- /**
- * Get JAAS configuration using configured Kerberos credentials
- *
- * @param context Property Context
- * @return JAAS configuration with Kerberos Login Module
- */
- @Override
- public String getConfiguration(final PropertyContext context) {
- final String principal;
- final String keyTab;
-
- final KerberosCredentialsService credentialsService =
context.getProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
- if (credentialsService == null) {
- principal =
context.getProperty(KafkaClientComponent.KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
- keyTab =
context.getProperty(KafkaClientComponent.KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
- } else {
- principal = credentialsService.getPrincipal();
- keyTab = credentialsService.getKeytab();
- }
-
- final String serviceName =
context.getProperty(KafkaClientComponent.KERBEROS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
- return String.format(FORMAT, MODULE_CLASS_NAME, serviceName,
principal, keyTab);
- }
-}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosDelegatingLoginConfigProvider.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosDelegatingLoginConfigProvider.java
index d811fb3307..88d3afecc7 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosDelegatingLoginConfigProvider.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosDelegatingLoginConfigProvider.java
@@ -16,16 +16,12 @@
*/
package org.apache.nifi.kafka.shared.login;
-import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
/**
* Kerberos Delegating Login Module implementation of configuration provider
*/
public class KerberosDelegatingLoginConfigProvider implements
LoginConfigProvider {
- private static final LoginConfigProvider CREDENTIALS_PROVIDER = new
KerberosCredentialsLoginConfigProvider();
-
private static final LoginConfigProvider USER_SERVICE_PROVIDER = new
KerberosUserServiceLoginConfigProvider();
/**
@@ -36,15 +32,6 @@ public class KerberosDelegatingLoginConfigProvider
implements LoginConfigProvide
*/
@Override
public String getConfiguration(final PropertyContext context) {
- final PropertyValue userServiceProperty =
context.getProperty(KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE);
-
- final String configuration;
- if (userServiceProperty.isSet()) {
- configuration = USER_SERVICE_PROVIDER.getConfiguration(context);
- } else {
- configuration = CREDENTIALS_PROVIDER.getConfiguration(context);
- }
-
- return configuration;
+ return USER_SERVICE_PROVIDER.getConfiguration(context);
}
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
index e36264f2af..50456a2775 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
@@ -24,7 +24,6 @@ import
org.apache.nifi.kafka.shared.property.KafkaClientProperty;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import java.util.ArrayList;
@@ -34,9 +33,6 @@ import java.util.List;
import java.util.Optional;
import java.util.function.Function;
-import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE;
-import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_KEYTAB;
-import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_PRINCIPAL;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.KERBEROS_SERVICE_NAME;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_MECHANISM;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_PASSWORD;
@@ -51,8 +47,6 @@ public class KafkaClientCustomValidationFunction implements
Function<ValidationC
static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG =
"java.security.auth.login.config";
- private static final String ALLOW_EXPLICIT_KEYTAB =
"NIFI_ALLOW_EXPLICIT_KEYTAB";
-
private static final String JNDI_LOGIN_MODULE_CLASS = "JndiLoginModule";
private static final String JND_LOGIN_MODULE_EXPLANATION = "The
JndiLoginModule is not allowed in the JAAS configuration";
@@ -72,7 +66,6 @@ public class KafkaClientCustomValidationFunction implements
Function<ValidationC
public Collection<ValidationResult> apply(final ValidationContext
validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();
validateLoginModule(validationContext, results);
- validateKerberosServices(validationContext, results);
validateKerberosCredentials(validationContext, results);
validateUsernamePassword(validationContext, results);
validateAwsMskIamMechanism(validationContext, results);
@@ -100,67 +93,6 @@ public class KafkaClientCustomValidationFunction implements
Function<ValidationC
}
}
- private void validateKerberosServices(final ValidationContext
validationContext, final Collection<ValidationResult> results) {
- final PropertyValue userServiceProperty =
validationContext.getProperty(SELF_CONTAINED_KERBEROS_USER_SERVICE);
- final PropertyValue credentialsServiceProperty =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE);
- final String principal =
validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
- final String keyTab =
validationContext.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
-
- if (userServiceProperty.isSet()) {
- if (credentialsServiceProperty.isSet()) {
- final String explanation = String.format("Cannot configure
both [%s] and [%s]",
- SELF_CONTAINED_KERBEROS_USER_SERVICE.getDisplayName(),
- KERBEROS_CREDENTIALS_SERVICE.getDisplayName()
- );
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
- .valid(false)
- .explanation(explanation)
- .build());
- }
-
- if (isNotEmpty(principal) || isNotEmpty(keyTab)) {
- final String explanation = String.format("Cannot configure
[%s] with [%s] or [%s]",
- SELF_CONTAINED_KERBEROS_USER_SERVICE.getDisplayName(),
- KERBEROS_PRINCIPAL.getDisplayName(),
- KERBEROS_KEYTAB.getDisplayName()
- );
- results.add(new ValidationResult.Builder()
-
.subject(SELF_CONTAINED_KERBEROS_USER_SERVICE.getDisplayName())
- .valid(false)
- .explanation(explanation)
- .build());
- }
- } else if (credentialsServiceProperty.isSet()) {
- if (isNotEmpty(principal) || isNotEmpty(keyTab)) {
- final String explanation = String.format("Cannot configure
[%s] with [%s] or [%s]",
- KERBEROS_CREDENTIALS_SERVICE.getDisplayName(),
- KERBEROS_PRINCIPAL.getDisplayName(),
- KERBEROS_KEYTAB.getDisplayName()
- );
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
- .valid(false)
- .explanation(explanation)
- .build());
- }
- }
-
- final String allowExplicitKeytab =
System.getenv(ALLOW_EXPLICIT_KEYTAB);
- if (Boolean.FALSE.toString().equalsIgnoreCase(allowExplicitKeytab) &&
(isNotEmpty(principal) || isNotEmpty(keyTab))) {
- final String explanation = String.format("Environment Variable
[%s] disables configuring [%s] and [%s] properties",
- ALLOW_EXPLICIT_KEYTAB,
- KERBEROS_PRINCIPAL.getDisplayName(),
- KERBEROS_KEYTAB.getDisplayName()
- );
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_PRINCIPAL.getDisplayName())
- .valid(false)
- .explanation(explanation)
- .build());
- }
- }
-
private void validateKerberosCredentials(final ValidationContext
validationContext, final Collection<ValidationResult> results) {
final String saslMechanism =
validationContext.getProperty(SASL_MECHANISM).getValue();
final String securityProtocol =
validationContext.getProperty(SECURITY_PROTOCOL).getValue();
@@ -176,29 +108,10 @@ public class KafkaClientCustomValidationFunction
implements Function<ValidationC
.build());
}
- final String principal =
validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
- final String keyTab =
validationContext.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final String systemLoginConfig =
System.getProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG);
- if (isEmpty(principal) && isNotEmpty(keyTab)) {
- final String explanation = String.format("[%s] required when
configuring [%s]", KERBEROS_KEYTAB.getDisplayName(),
KERBEROS_PRINCIPAL.getDisplayName());
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_PRINCIPAL.getDisplayName())
- .valid(false)
- .explanation(explanation)
- .build());
- } else if (isNotEmpty(principal) && isEmpty(keyTab)) {
- final String explanation = String.format("[%s] required when
configuring [%s]", KERBEROS_PRINCIPAL.getDisplayName(),
KERBEROS_KEYTAB.getDisplayName());
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_KEYTAB.getDisplayName())
- .valid(false)
- .explanation(explanation)
- .build());
- }
-
final KerberosUserService userService =
validationContext.getProperty(SELF_CONTAINED_KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
- final KerberosCredentialsService credentialsService =
validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
- if (userService == null && credentialsService == null &&
isEmpty(principal) && isEmpty(keyTab) && isEmpty(systemLoginConfig)) {
+ if (userService == null && isEmpty(systemLoginConfig)) {
final String explanation = String.format("Kerberos Credentials
not found in component properties or System Property [%s]",
JAVA_SECURITY_AUTH_LOGIN_CONFIG);
results.add(new ValidationResult.Builder()
.subject(SASL_MECHANISM.getDisplayName())
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunctionTest.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunctionTest.java
index 9bebcff102..6f43ad9530 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunctionTest.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunctionTest.java
@@ -98,39 +98,6 @@ class KafkaClientCustomValidationFunctionTest {
assertTrue(results.isEmpty());
}
- @Test
- void testApplyUserServiceWithCredentialsServiceInvalid() {
-
runner.setProperty(KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE,
KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE.getName());
- runner.setProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE,
KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getName());
-
- final ValidationContext validationContext = getValidationContext();
- final Collection<ValidationResult> results =
validationFunction.apply(validationContext);
-
- assertPropertyValidationResultFound(results,
KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getDisplayName());
- }
-
- @Test
- void testApplyCredentialsServiceWithPrincipalInvalid() {
- runner.setProperty(KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE,
KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getName());
- runner.setProperty(KafkaClientComponent.KERBEROS_PRINCIPAL,
KafkaClientComponent.KERBEROS_PRINCIPAL.getName());
-
- final ValidationContext validationContext = getValidationContext();
- final Collection<ValidationResult> results =
validationFunction.apply(validationContext);
-
- assertPropertyValidationResultFound(results,
KafkaClientComponent.KERBEROS_CREDENTIALS_SERVICE.getDisplayName());
- }
-
- @Test
- void testApplyPrincipalKeyTabValid() {
- runner.setProperty(KafkaClientComponent.KERBEROS_PRINCIPAL,
KafkaClientComponent.KERBEROS_PRINCIPAL.getName());
- runner.setProperty(KafkaClientComponent.KERBEROS_KEYTAB,
KafkaClientComponent.KERBEROS_KEYTAB.getName());
-
- final ValidationContext validationContext = getValidationContext();
- final Collection<ValidationResult> results =
validationFunction.apply(validationContext);
-
- assertTrue(results.isEmpty());
- }
-
@Test
void testApplyPlainUsernameWithoutPasswordInvalid() {
runner.setProperty(KafkaClientComponent.SASL_USERNAME,
KafkaClientComponent.SASL_USERNAME.getName());