This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 66a9c4968d9 [FLINK-28000][runtime][security] Throw exception when
principal is set in the configuration without keytab
66a9c4968d9 is described below
commit 66a9c4968d90a545d10e03c3c5684da335ec451b
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon Jun 13 10:19:47 2022 +0200
[FLINK-28000][runtime][security] Throw exception when principal is set in
the configuration without keytab
---
.../runtime/security/SecurityConfiguration.java | 26 ++++----
.../security/SecurityConfigurationTest.java | 75 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 14 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
index 15b81e9551d..4db3e3a6139 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
@@ -40,6 +40,9 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
public class SecurityConfiguration {
+ private static final String KERBEROS_CONFIG_ERROR_PREFIX =
+ "Kerberos login configuration is invalid: ";
+
private final List<String> securityContextFactory;
private final List<String> securityModuleFactories;
@@ -82,6 +85,7 @@ public class SecurityConfiguration {
Configuration flinkConf,
List<String> securityContextFactory,
List<String> securityModuleFactories) {
+ this.flinkConfig = checkNotNull(flinkConf);
this.isZkSaslDisable =
flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
this.keytab =
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
this.principal =
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
@@ -93,7 +97,6 @@ public class SecurityConfiguration {
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
this.securityModuleFactories =
Collections.unmodifiableList(securityModuleFactories);
this.securityContextFactory = securityContextFactory;
- this.flinkConfig = checkNotNull(flinkConf);
validate();
}
@@ -138,25 +141,20 @@ public class SecurityConfiguration {
}
private void validate() {
- if (!StringUtils.isBlank(keytab)) {
- // principal is required
- if (StringUtils.isBlank(principal)) {
- throw new IllegalConfigurationException(
- "Kerberos login configuration is invalid: keytab
requires a principal.");
- }
+ if (StringUtils.isBlank(keytab) != StringUtils.isBlank(principal)) {
+ throw new IllegalConfigurationException(
+ KERBEROS_CONFIG_ERROR_PREFIX
+ + "either both keytab and principal must be
defined, or neither.");
+ }
- // check the keytab is readable
+ if (!StringUtils.isBlank(keytab)) {
File keytabFile = new File(keytab);
if (!keytabFile.exists() || !keytabFile.isFile()) {
throw new IllegalConfigurationException(
- "Kerberos login configuration is invalid: keytab ["
- + keytab
- + "] doesn't exist!");
+ KERBEROS_CONFIG_ERROR_PREFIX + "keytab [" + keytab +
"] doesn't exist!");
} else if (!keytabFile.canRead()) {
throw new IllegalConfigurationException(
- "Kerberos login configuration is invalid: keytab ["
- + keytab
- + "] is unreadable!");
+ KERBEROS_CONFIG_ERROR_PREFIX + "keytab [" + keytab +
"] is unreadable!");
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityConfigurationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityConfigurationTest.java
new file mode 100644
index 00000000000..44fb32cac1d
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityConfigurationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.junit.jupiter.api.Test;
+
+import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for the {@link SecurityConfiguration}. */
+class SecurityConfigurationTest {
+
+ @Test
+ public void keytabWithoutPrincipalShouldThrowException() {
+ Configuration configuration = new Configuration();
+ configuration.set(KERBEROS_LOGIN_KEYTAB, "keytab.file");
+
+ IllegalConfigurationException e =
+ assertThrows(
+ IllegalConfigurationException.class,
+ () -> new SecurityConfiguration(configuration));
+ assertTrue(
+ e.getMessage()
+ .contains("either both keytab and principal must be
defined, or neither"));
+ }
+
+ @Test
+ public void principalWithoutKeytabShouldThrowException() {
+ Configuration configuration = new Configuration();
+ configuration.set(KERBEROS_LOGIN_PRINCIPAL, "principal");
+
+ IllegalConfigurationException e =
+ assertThrows(
+ IllegalConfigurationException.class,
+ () -> new SecurityConfiguration(configuration));
+ assertTrue(
+ e.getMessage()
+ .contains("either both keytab and principal must be
defined, or neither"));
+ }
+
+ @Test
+ public void notExistingKeytabShouldThrowException() {
+ Configuration configuration = new Configuration();
+ configuration.set(KERBEROS_LOGIN_KEYTAB, "nonexistingkeytab.file");
+ configuration.set(KERBEROS_LOGIN_PRINCIPAL, "principal");
+
+ IllegalConfigurationException e =
+ assertThrows(
+ IllegalConfigurationException.class,
+ () -> new SecurityConfiguration(configuration));
+ assertTrue(e.getMessage().contains("nonexistingkeytab.file"));
+ assertTrue(e.getMessage().contains("doesn't exist"));
+ }
+}