This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 66a9c4968d9 [FLINK-28000][runtime][security] Throw exception when 
principal is set in the configuration without keytab
66a9c4968d9 is described below

commit 66a9c4968d90a545d10e03c3c5684da335ec451b
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon Jun 13 10:19:47 2022 +0200

    [FLINK-28000][runtime][security] Throw exception when principal is set in 
the configuration without keytab
---
 .../runtime/security/SecurityConfiguration.java    | 26 ++++----
 .../security/SecurityConfigurationTest.java        | 75 ++++++++++++++++++++++
 2 files changed, 87 insertions(+), 14 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
index 15b81e9551d..4db3e3a6139 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityConfiguration.java
@@ -40,6 +40,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SecurityConfiguration {
 
+    private static final String KERBEROS_CONFIG_ERROR_PREFIX =
+            "Kerberos login configuration is invalid: ";
+
     private final List<String> securityContextFactory;
 
     private final List<String> securityModuleFactories;
@@ -82,6 +85,7 @@ public class SecurityConfiguration {
             Configuration flinkConf,
             List<String> securityContextFactory,
             List<String> securityModuleFactories) {
+        this.flinkConfig = checkNotNull(flinkConf);
         this.isZkSaslDisable = 
flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
         this.keytab = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
         this.principal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
@@ -93,7 +97,6 @@ public class SecurityConfiguration {
                 
flinkConf.getString(SecurityOptions.ZOOKEEPER_SASL_LOGIN_CONTEXT_NAME);
         this.securityModuleFactories = 
Collections.unmodifiableList(securityModuleFactories);
         this.securityContextFactory = securityContextFactory;
-        this.flinkConfig = checkNotNull(flinkConf);
         validate();
     }
 
@@ -138,25 +141,20 @@ public class SecurityConfiguration {
     }
 
     private void validate() {
-        if (!StringUtils.isBlank(keytab)) {
-            // principal is required
-            if (StringUtils.isBlank(principal)) {
-                throw new IllegalConfigurationException(
-                        "Kerberos login configuration is invalid: keytab 
requires a principal.");
-            }
+        if (StringUtils.isBlank(keytab) != StringUtils.isBlank(principal)) {
+            throw new IllegalConfigurationException(
+                    KERBEROS_CONFIG_ERROR_PREFIX
+                            + "either both keytab and principal must be 
defined, or neither.");
+        }
 
-            // check the keytab is readable
+        if (!StringUtils.isBlank(keytab)) {
             File keytabFile = new File(keytab);
             if (!keytabFile.exists() || !keytabFile.isFile()) {
                 throw new IllegalConfigurationException(
-                        "Kerberos login configuration is invalid: keytab ["
-                                + keytab
-                                + "] doesn't exist!");
+                        KERBEROS_CONFIG_ERROR_PREFIX + "keytab [" + keytab + 
"] doesn't exist!");
             } else if (!keytabFile.canRead()) {
                 throw new IllegalConfigurationException(
-                        "Kerberos login configuration is invalid: keytab ["
-                                + keytab
-                                + "] is unreadable!");
+                        KERBEROS_CONFIG_ERROR_PREFIX + "keytab [" + keytab + 
"] is unreadable!");
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityConfigurationTest.java
new file mode 100644
index 00000000000..44fb32cac1d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/security/SecurityConfigurationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_KEYTAB;
+import static 
org.apache.flink.configuration.SecurityOptions.KERBEROS_LOGIN_PRINCIPAL;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for the {@link SecurityConfiguration}. */
+class SecurityConfigurationTest {
+
+    @Test
+    public void keytabWithoutPrincipalShouldThrowException() {
+        Configuration configuration = new Configuration();
+        configuration.set(KERBEROS_LOGIN_KEYTAB, "keytab.file");
+
+        IllegalConfigurationException e =
+                assertThrows(
+                        IllegalConfigurationException.class,
+                        () -> new SecurityConfiguration(configuration));
+        assertTrue(
+                e.getMessage()
+                        .contains("either both keytab and principal must be 
defined, or neither"));
+    }
+
+    @Test
+    public void principalWithoutKeytabShouldThrowException() {
+        Configuration configuration = new Configuration();
+        configuration.set(KERBEROS_LOGIN_PRINCIPAL, "principal");
+
+        IllegalConfigurationException e =
+                assertThrows(
+                        IllegalConfigurationException.class,
+                        () -> new SecurityConfiguration(configuration));
+        assertTrue(
+                e.getMessage()
+                        .contains("either both keytab and principal must be 
defined, or neither"));
+    }
+
+    @Test
+    public void notExistingKeytabShouldThrowException() {
+        Configuration configuration = new Configuration();
+        configuration.set(KERBEROS_LOGIN_KEYTAB, "nonexistingkeytab.file");
+        configuration.set(KERBEROS_LOGIN_PRINCIPAL, "principal");
+
+        IllegalConfigurationException e =
+                assertThrows(
+                        IllegalConfigurationException.class,
+                        () -> new SecurityConfiguration(configuration));
+        assertTrue(e.getMessage().contains("nonexistingkeytab.file"));
+        assertTrue(e.getMessage().contains("doesn't exist"));
+    }
+}

Reply via email to