exceptionfactory commented on a change in pull request #5277:
URL: https://github.com/apache/nifi/pull/5277#discussion_r691202562
##########
File path: nifi-assembly/pom.xml
##########
@@ -742,6 +742,12 @@ language governing permissions and limitations under the
License. -->
<version>1.15.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
Review comment:
It looks like the indentation of this line is slightly off.
##########
File path: nifi-commons/nifi-security-kerberos/pom.xml
##########
@@ -26,6 +26,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
Review comment:
It looks like this line identation could be adjusted.
```suggestion
<dependency>
```
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -232,6 +251,22 @@ protected KerberosProperties getKerberosProperties(File
kerberosConfigFile) {
.build());
}
+ if (kerberosUserService != null && (explicitPrincipal != null ||
explicitKeytab != null || explicitPassword != null)) {
+ results.add(new ValidationResult.Builder()
+ .subject("Kerberos User")
+ .valid(false)
+ .explanation("Cannot specify a Kerberos User Service while
also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password")
+ .build());
+ }
+
+ if (kerberosUserService != null && credentialsService != null) {
+ results.add(new ValidationResult.Builder()
+ .subject("Kerberos User")
+ .valid(false)
+ .explanation("Cannot specify a Kerberos User Service while
also specifying a Kerberos Credential Service")
Review comment:
Minor wording adjustment to match service name:
```suggestion
.explanation("Cannot specify a Kerberos User Service
while also specifying a Kerberos Credentials Service")
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
##########
@@ -199,6 +203,13 @@
.identifiesControllerService(KerberosCredentialsService.class)
.required(false)
.build();
+ public static final PropertyDescriptor KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
Review comment:
With this being a public variable, is it worth renaming to
`SELF_CONTAINED_KERBEROS_USER_SERVICE` to indicate the more specific type of
`KerberosUserService` it identifies?
##########
File path:
nifi-nar-bundles/nifi-standard-services/nifi-kerberos-user-service-bundle/nifi-kerberos-user-service/.gitignore
##########
@@ -0,0 +1 @@
+/bin/
Review comment:
Should the `.gitignore` entry be set at a higher parent directory?
##########
File path:
nifi-commons/nifi-security-kerberos/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
##########
@@ -75,7 +87,9 @@ public synchronized void login() throws LoginException {
// other classes may be referencing an existing subject
and replacing it may break functionality of those other classes after relogin
this.subject = new Subject();
}
- this.loginContext = createLoginContext(subject);
+
+ // the Configuration implementations have only one config
entry and always return it regardless of the passed in name
+ this.loginContext = new LoginContext("KerberosUser", subject,
createCallbackHandler(), createConfiguration());
Review comment:
It looks like the name `KerberosUser` has special meaning and is reused
elsewhere in this class, it would be helpful to pull it out to a static
variable.
##########
File path:
nifi-commons/nifi-security-kerberos/src/test/java/org/apache/nifi/security/krb/TestKerberosTicketCacheUser.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.krb;
+
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class TestKerberosTicketCacheUser {
+
+ @Test
+ public void testGetConfigurationEntry() {
+ final String principal = "[email protected]";
+
+ final KerberosUser kerberosUser = new
KerberosTicketCacheUser(principal);
+ assertEquals(principal, kerberosUser.getPrincipal());
+
+ final AppConfigurationEntry entry =
kerberosUser.getConfigurationEntry();
+ assertNotNull(entry);
+ assertEquals(ConfigurationUtil.SUN_KRB5_LOGIN_MODULE,
entry.getLoginModuleName());
+ assertEquals(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
entry.getControlFlag());
+ assertEquals(principal, entry.getOptions().get("principal"));
+ assertEquals("true", entry.getOptions().get("useTicketCache"));
+ assertNull(entry.getOptions().get("ticketCache"));
+ }
+
+ @Test
+ public void testGetConfigurationEntryWithSpecificTicketCache() {
+ final String principal = "[email protected]";
+ final String ticketCache = "/tmp/cache";
Review comment:
Is this just a placeholder value, or is the directory actually used? If
it is used, it would be better to replace it with a base directory using
`java.io.tmpdir`.
##########
File path: nifi-commons/nifi-security-kerberos-api/pom.xml
##########
@@ -0,0 +1,26 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-commons</artifactId>
+ <version>1.15.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-security-kerberos-api</artifactId>
+ <dependencies>
+ </dependencies>
Review comment:
Should this element be moved since there are no dependencies?
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -321,6 +356,15 @@ public final void abstractOnStopped() {
}
}
+ final KerberosUser kerberosUser = resources.getKerberosUser();
+ if (kerberosUser != null) {
+ try {
+ kerberosUser.logout();
+ } catch (LoginException e) {
+ getLogger().warn("Error logging out KerberosUser: " +
e.getMessage(), e);
Review comment:
This could be adjusted to use placeholders instead of concatenation. Is
there value in including the KerberosUser name in the message?
```suggestion
getLogger().warn("Error logging out KerberosUser: {}",
e.getMessage(), e);
```
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -212,8 +224,15 @@ protected KerberosProperties getKerberosProperties(File
kerberosConfigFile) {
try {
final Configuration conf =
getHadoopConfigurationForValidation(locations);
-
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
- this.getClass().getSimpleName(), conf, resolvedPrincipal,
resolvedKeytab, explicitPassword, getLogger()));
+ if (kerberosUserService == null) {
+
results.addAll(KerberosProperties.validatePrincipalWithKeytabOrPassword(
+ this.getClass().getSimpleName(), conf,
resolvedPrincipal, resolvedKeytab, explicitPassword, getLogger()));
+ } else {
+ final boolean securityEnabled =
SecurityUtil.isSecurityEnabled(conf);
+ if (!securityEnabled) {
+ getLogger().warn("Configuration does not have security
enabled, KerberosUserService will be ignored");
Review comment:
For clarity, what do you think about prefixing the message with `Hadoop`?
```suggestion
getLogger().warn("Hadoop Configuration does not have
security enabled, KerberosUserService will be ignored");
```
##########
File path:
nifi-commons/nifi-security-kerberos/src/main/java/org/apache/nifi/security/krb/AbstractKerberosUser.java
##########
@@ -212,13 +260,16 @@ private boolean isTGSPrincipal(final KerberosPrincipal
principal) {
private long getRefreshTime(final KerberosTicket tgt) {
long start = tgt.getStartTime().getTime();
long end = tgt.getEndTime().getTime();
+ long renewUntil = tgt.getRenewTill().getTime();
Review comment:
Can this be final?
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
##########
@@ -450,6 +476,36 @@ HdfsResources resetHDFSResources(final List<String>
resourceLocations, ProcessCo
return new HdfsResources(config, fs, ugi, kerberosUser);
}
+ private KerberosUser getKerberosUser(final ProcessContext context) throws
IOException {
+ // Check Kerberos User Service first, if present then get the
KerberosUser from the service
+ // The customValidate method ensures that KerberosUserService can't be
set at the same time as the credentials service or explicit properties
+ final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+ if (kerberosUserService != null) {
+ return kerberosUserService.createKerberosUser();
+ }
+
+ // Kerberos User Service wasn't set, so create KerberosUser based on
credentials service or explicit properties...
+ String principal =
context.getProperty(kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
+ String keyTab =
context.getProperty(kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
+ String password =
context.getProperty(kerberosProperties.getKerberosPassword()).getValue();
+
+ // If the Kerberos Credentials Service is specified, we need to use
its configuration, not the explicit properties for principal/keytab.
+ // The customValidate method ensures that only one can be set, so we
know that the principal & keytab above are null.
+ final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ if (credentialsService != null) {
+ principal = credentialsService.getPrincipal();
+ keyTab = credentialsService.getKeytab();
+ }
+
+ if (keyTab != null) {
+ return new KerberosKeytabUser(principal, keyTab);
+ } else if (password != null) {
+ return new KerberosPasswordUser(principal, password);
+ } else {
+ throw new IOException("Unable to authenticate with Kerberos, no
keytab or password was provided");
Review comment:
It looks like this is a carry-over from the previous implementation, but
`IOException` doesn't seem like the best option. Although it would change the
signature, what do you think about `IllegalArgumentException`, or perhaps some
other exception that indicates a configuration problem?
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used,
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+ private static final Logger log =
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+ private Thread t;
Review comment:
Recommend renaming this variable for clarity:
```suggestion
private Thread refreshThread;
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
##########
@@ -284,6 +314,16 @@
+ "must be set or neither must be set.")
.build());
}
+
+ final String jvmJaasConfigFile =
System.getProperty("java.security.auth.login.config");
Review comment:
For clarity, it might be helpful to declare the System property name as
a static variable.
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used,
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+ private static final Logger log =
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+ private Thread t;
+ private boolean isKrbTicket;
+
+ private String principal;
+
+ private double ticketRenewWindowFactor;
+ private long minTimeBeforeRelogin;
+
+ private volatile Subject subject;
+
+ private LoginContext loginContext;
+ private String serviceName;
+
+ @Override
+ public void configure(Map<String, ?> configs, String contextName,
Configuration configuration,
+ AuthenticateCallbackHandler callbackHandler) {
+ super.configure(configs, contextName, configuration, callbackHandler);
+ this.ticketRenewWindowFactor = (Double)
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+ this.minTimeBeforeRelogin = (Long)
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+ this.serviceName = getServiceName(configs, contextName, configuration);
+ }
+
+ /**
+ * Performs login for each login module specified for the login context of
this instance and starts the thread used
+ * to periodically re-login to the Kerberos Ticket Granting Server.
+ */
+ @Override
+ public LoginContext login() throws LoginException {
+ loginContext = super.login();
+ subject = loginContext.getSubject();
+ isKrbTicket =
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+ AppConfigurationEntry[] entries =
configuration().getAppConfigurationEntry(contextName());
+ if (entries.length == 0) {
+ principal = null;
+ } else {
+ // there will only be a single entry
+ AppConfigurationEntry entry = entries[0];
+ if (entry.getOptions().get("principal") != null)
+ principal = (String) entry.getOptions().get("principal");
+ else
+ principal = null;
+ }
+
+ if (!isKrbTicket) {
+ log.debug("[Principal={}]: It is not a Kerberos ticket",
principal);
+ t = null;
+ // if no TGT, do not bother with ticket management.
+ return loginContext;
+ }
+ log.debug("[Principal={}]: It is a Kerberos ticket", principal);
+
+ t =
KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s",
principal), () -> {
+ log.info("[Principal={}]: TGT refresh thread started,
minTimeBeforeRelogin = {}", principal, minTimeBeforeRelogin);
+ while (true) {
+ try {
+ Thread.sleep(minTimeBeforeRelogin);
+ } catch (InterruptedException ie) {
+ log.warn("[Principal={}]: TGT renewal thread has been
interrupted and will exit.", principal);
+ return;
+ }
+ try {
+ checkTGTAndReLogin();
+ } catch (Throwable t) {
+ log.error("[Principal={}]: Error from TGT refresh thread",
principal, t);
+ }
+ }
+ });
+ t.start();
+ return loginContext;
+ }
+
+ @Override
+ public void close() {
+ if ((t != null) && (t.isAlive())) {
+ t.interrupt();
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ log.warn("[Principal={}]: Error while waiting for Login thread
to shutdown.", principal, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public Subject subject() {
+ return subject;
+ }
+
+ @Override
+ public String serviceName() {
+ return serviceName;
+ }
+
+ private synchronized void checkTGTAndReLogin() throws LoginException {
+ final KerberosTicket tgt = getTGT();
+ if (tgt == null) {
+ log.info("[Principal={}]: TGT was not found, performing login",
principal);
+ reLogin();
+ return;
+ }
+
+ if (System.currentTimeMillis() < getRefreshTime(tgt)) {
+ log.debug("[Principal={}]: TGT was found, but has not reached
expiration window", principal);
+ return;
+ }
+
+ try {
+ tgt.refresh();
+ log.info("[Principal={}]: TGT refreshed", principal);
+ getRefreshTime(tgt);
+ } catch (RefreshFailedException rfe) {
+ log.warn("[Principal={}]: TGT refresh failed, will attempt
relogin", principal);
+ log.debug("", rfe);
+ reLogin();
+ }
+ }
+
+ private static String getServiceName(Map<String, ?> configs, String
contextName, Configuration configuration) {
+ List<AppConfigurationEntry> configEntries =
Arrays.asList(configuration.getAppConfigurationEntry(contextName));
+ String jaasServiceName = JaasContext.configEntryOption(configEntries,
JaasUtils.SERVICE_NAME, null);
+ String configServiceName = (String)
configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
+ if (jaasServiceName != null && configServiceName != null &&
!jaasServiceName.equals(configServiceName)) {
+ String message = String.format("Conflicting serviceName values
found in JAAS and Kafka configs " +
+ "value in JAAS file %s, value in Kafka config %s",
jaasServiceName, configServiceName);
+ throw new IllegalArgumentException(message);
+ }
+
+ if (jaasServiceName != null)
+ return jaasServiceName;
+ if (configServiceName != null)
+ return configServiceName;
+
+ throw new IllegalArgumentException("No serviceName defined in either
JAAS or Kafka config");
+ }
+
+
+ private long getRefreshTime(final KerberosTicket tgt) {
+ long start = tgt.getStartTime().getTime();
+ long expires = tgt.getEndTime().getTime();
+
+ log.debug("[Principal={}]: TGT valid starting at: {}", principal,
tgt.getStartTime());
+ log.debug("[Principal={}]: TGT expires: {}", principal,
tgt.getEndTime());
+ log.debug("[Principal={}]: TGT renew until: {}", principal,
tgt.getRenewTill());
+
+ return start + (long) ((expires - start) * ticketRenewWindowFactor);
+ }
+
+ private KerberosTicket getTGT() {
+ Set<KerberosTicket> tickets =
subject.getPrivateCredentials(KerberosTicket.class);
+ for (KerberosTicket ticket : tickets) {
+ KerberosPrincipal server = ticket.getServer();
+ if (server.getName().equals("krbtgt/" + server.getRealm() + "@" +
server.getRealm())) {
Review comment:
This would be easier to follow if the value for comparison were formed
on a separate line:
```suggestion
final String expectedServerName = String.format("krbtgt/%s@%s",
server.getRealm(), server.getRealm());
if (server.getName().equals(expectedServerName)) {
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used,
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+ private static final Logger log =
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+ private Thread t;
+ private boolean isKrbTicket;
+
+ private String principal;
+
+ private double ticketRenewWindowFactor;
+ private long minTimeBeforeRelogin;
+
+ private volatile Subject subject;
+
+ private LoginContext loginContext;
+ private String serviceName;
+
+ @Override
+ public void configure(Map<String, ?> configs, String contextName,
Configuration configuration,
+ AuthenticateCallbackHandler callbackHandler) {
+ super.configure(configs, contextName, configuration, callbackHandler);
+ this.ticketRenewWindowFactor = (Double)
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+ this.minTimeBeforeRelogin = (Long)
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+ this.serviceName = getServiceName(configs, contextName, configuration);
+ }
+
+ /**
+ * Performs login for each login module specified for the login context of
this instance and starts the thread used
+ * to periodically re-login to the Kerberos Ticket Granting Server.
+ */
+ @Override
+ public LoginContext login() throws LoginException {
+ loginContext = super.login();
+ subject = loginContext.getSubject();
+ isKrbTicket =
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+ AppConfigurationEntry[] entries =
configuration().getAppConfigurationEntry(contextName());
+ if (entries.length == 0) {
+ principal = null;
+ } else {
+ // there will only be a single entry
+ AppConfigurationEntry entry = entries[0];
+ if (entry.getOptions().get("principal") != null)
+ principal = (String) entry.getOptions().get("principal");
+ else
+ principal = null;
+ }
+
+ if (!isKrbTicket) {
+ log.debug("[Principal={}]: It is not a Kerberos ticket",
principal);
+ t = null;
+ // if no TGT, do not bother with ticket management.
+ return loginContext;
+ }
+ log.debug("[Principal={}]: It is a Kerberos ticket", principal);
+
+ t =
KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s",
principal), () -> {
+ log.info("[Principal={}]: TGT refresh thread started,
minTimeBeforeRelogin = {}", principal, minTimeBeforeRelogin);
+ while (true) {
+ try {
+ Thread.sleep(minTimeBeforeRelogin);
+ } catch (InterruptedException ie) {
+ log.warn("[Principal={}]: TGT renewal thread has been
interrupted and will exit.", principal);
+ return;
+ }
+ try {
+ checkTGTAndReLogin();
+ } catch (Throwable t) {
+ log.error("[Principal={}]: Error from TGT refresh thread",
principal, t);
+ }
+ }
+ });
+ t.start();
+ return loginContext;
+ }
+
+ @Override
+ public void close() {
+ if ((t != null) && (t.isAlive())) {
+ t.interrupt();
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ log.warn("[Principal={}]: Error while waiting for Login thread
to shutdown.", principal, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public Subject subject() {
+ return subject;
+ }
+
+ @Override
+ public String serviceName() {
+ return serviceName;
+ }
+
+ private synchronized void checkTGTAndReLogin() throws LoginException {
+ final KerberosTicket tgt = getTGT();
+ if (tgt == null) {
+ log.info("[Principal={}]: TGT was not found, performing login",
principal);
+ reLogin();
+ return;
+ }
+
+ if (System.currentTimeMillis() < getRefreshTime(tgt)) {
+ log.debug("[Principal={}]: TGT was found, but has not reached
expiration window", principal);
+ return;
+ }
+
+ try {
+ tgt.refresh();
+ log.info("[Principal={}]: TGT refreshed", principal);
+ getRefreshTime(tgt);
+ } catch (RefreshFailedException rfe) {
+ log.warn("[Principal={}]: TGT refresh failed, will attempt
relogin", principal);
+ log.debug("", rfe);
+ reLogin();
+ }
+ }
+
+ private static String getServiceName(Map<String, ?> configs, String
contextName, Configuration configuration) {
+ List<AppConfigurationEntry> configEntries =
Arrays.asList(configuration.getAppConfigurationEntry(contextName));
+ String jaasServiceName = JaasContext.configEntryOption(configEntries,
JaasUtils.SERVICE_NAME, null);
+ String configServiceName = (String)
configs.get(SaslConfigs.SASL_KERBEROS_SERVICE_NAME);
+ if (jaasServiceName != null && configServiceName != null &&
!jaasServiceName.equals(configServiceName)) {
+ String message = String.format("Conflicting serviceName values
found in JAAS and Kafka configs " +
+ "value in JAAS file %s, value in Kafka config %s",
jaasServiceName, configServiceName);
+ throw new IllegalArgumentException(message);
+ }
+
+ if (jaasServiceName != null)
+ return jaasServiceName;
+ if (configServiceName != null)
+ return configServiceName;
Review comment:
Recommend using standard brackets as opposed to shortened conditional
form.
```suggestion
if (jaasServiceName != null) {
return jaasServiceName;
}
if (configServiceName != null) {
return configServiceName;
}
```
##########
File path:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/CustomKerberosLogin.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.JaasContext;
+import org.apache.kafka.common.security.JaasUtils;
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AbstractLogin;
+import org.apache.kafka.common.security.kerberos.KerberosLogin;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Customized version of {@link
org.apache.kafka.common.security.kerberos.KerberosLogin} which improves the
re-login logic
+ * to avoid making system calls to kinit when the ticket cache is being used,
and to avoid exiting the refresh thread so that
+ * it may recover if the ticket cache is externally refreshed.
+ *
+ * The re-login thread follows a similar approach used by NiFi's KerberosUser
which attempts to call tgt.refresh()
+ * and falls back to a logout/login.
+ *
+ * The Kafka client is configured to use this login by setting
SaslConfigs.SASL_LOGIN_CLASS in {@link KafkaProcessorUtils}
+ * when the SASL mechanism is GSSAPI.
+ */
+public class CustomKerberosLogin extends AbstractLogin {
+ private static final Logger log =
LoggerFactory.getLogger(CustomKerberosLogin.class);
+
+ private Thread t;
+ private boolean isKrbTicket;
+
+ private String principal;
+
+ private double ticketRenewWindowFactor;
+ private long minTimeBeforeRelogin;
+
+ private volatile Subject subject;
+
+ private LoginContext loginContext;
+ private String serviceName;
+
+ @Override
+ public void configure(Map<String, ?> configs, String contextName,
Configuration configuration,
+ AuthenticateCallbackHandler callbackHandler) {
+ super.configure(configs, contextName, configuration, callbackHandler);
+ this.ticketRenewWindowFactor = (Double)
configs.get(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR);
+ this.minTimeBeforeRelogin = (Long)
configs.get(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN);
+ this.serviceName = getServiceName(configs, contextName, configuration);
+ }
+
+ /**
+ * Performs login for each login module specified for the login context of
this instance and starts the thread used
+ * to periodically re-login to the Kerberos Ticket Granting Server.
+ */
+ @Override
+ public LoginContext login() throws LoginException {
+ loginContext = super.login();
+ subject = loginContext.getSubject();
+ isKrbTicket =
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty();
+
+ AppConfigurationEntry[] entries =
configuration().getAppConfigurationEntry(contextName());
+ if (entries.length == 0) {
+ principal = null;
+ } else {
+ // there will only be a single entry
+ AppConfigurationEntry entry = entries[0];
+ if (entry.getOptions().get("principal") != null)
+ principal = (String) entry.getOptions().get("principal");
+ else
+ principal = null;
+ }
+
+ if (!isKrbTicket) {
+ log.debug("[Principal={}]: It is not a Kerberos ticket",
principal);
+ t = null;
+ // if no TGT, do not bother with ticket management.
+ return loginContext;
+ }
+ log.debug("[Principal={}]: It is a Kerberos ticket", principal);
+
+ t =
KafkaThread.daemon(String.format("kafka-kerberos-refresh-thread-%s",
principal), () -> {
Review comment:
Is the `login()` method called only once in the lifecycle of this class?
This line overwrites the previous Thread value without interrupting the thread.
In addition, it might be helpful to refactor this approach to an explicit
`Runnable`, and then use an `ExecutorService`, as opposed to creating a new
Thread.
##########
File path:
nifi-nar-bundles/nifi-standard-services/nifi-kerberos-user-service-bundle/nifi-kerberos-user-service/src/main/java/org/apache/nifi/kerberos/KerberosTicketCacheUserService.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kerberos;
+
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.Restriction;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.RequiredPermission;
+import org.apache.nifi.components.resource.ResourceCardinality;
+import org.apache.nifi.components.resource.ResourceType;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.security.krb.KerberosTicketCacheUser;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import java.util.Collections;
+import java.util.List;
+
+@CapabilityDescription("Provides a mechanism for creating a KerberosUser from
a principal and ticket cache that other components " +
+ "are able to use in order to perform authentication using Kerberos. By
encapsulating this information into a Controller Service " +
+ "and allowing other components to make use of it an administrator is
able to choose which users are allowed to use which ticket " +
+ "caches and principals. This provides a more robust security model for
multi-tenant use cases.")
+@Tags({"Kerberos", "Ticket", "Cache", "Principal", "Credentials",
"Authentication", "Security"})
+@Restricted(restrictions = {
+ @Restriction(requiredPermission =
RequiredPermission.ACCESS_TICKET_CACHE,
+ explanation = "Allows user to define a ticket cache and
principal that can then be used by other components.")
+})
+public class KerberosTicketCacheUserService extends
AbstractKerberosUserService implements SelfContainedKerberosUserService {
+
+ static final PropertyDescriptor TICKET_CACHE_FILE = new
PropertyDescriptor.Builder()
+ .name("Kerberos Ticket Cache File")
+ .description("Kerberos ticket cache associated with the
principal.")
+ .identifiesExternalResource(ResourceCardinality.SINGLE,
ResourceType.FILE)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .build();
+
+ private volatile String ticketCache;
Review comment:
Should this be named something like `ticketCachePath` or
`ticketCacheLocation`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]