This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2be676cd3 [core] Introduce SecurityContext (#834)
2be676cd3 is described below

commit 2be676cd339e6b78ff08facc69f2137474ecac2e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 6 10:15:33 2023 +0800

    [core] Introduce SecurityContext (#834)
---
 docs/content/filesystems/hdfs.md                   |  44 +++++-
 .../org/apache/paimon/security/HadoopModule.java   |  82 ++++++++++
 .../paimon/security/HadoopSecurityContext.java     |  42 +++++
 .../paimon/security/KerberosLoginProvider.java     | 115 ++++++++++++++
 .../paimon/security/SecurityConfiguration.java     |  97 ++++++++++++
 .../apache/paimon/security/SecurityContext.java    |  94 +++++++++++
 .../java/org/apache/paimon/utils/StringUtils.java  |  13 ++
 .../security/KerberosLoginProviderITCase.java      | 175 +++++++++++++++++++++
 8 files changed, 658 insertions(+), 4 deletions(-)

diff --git a/docs/content/filesystems/hdfs.md b/docs/content/filesystems/hdfs.md
index 7c4eddc39..a829ac3a6 100644
--- a/docs/content/filesystems/hdfs.md
+++ b/docs/content/filesystems/hdfs.md
@@ -77,6 +77,46 @@ For Alluxio support add the following entry into the 
core-site.xml file:
 </property>
 ```
 
+## Kerberos
+
+{{< tabs "Kerberos" >}}
+
+{{< tab "Flink" >}}
+
+It is recommented to use [Flink Kerberos 
Keytab](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/security/security-kerberos/).
+
+{{< /tab >}}
+
+{{< tab "Spark" >}}
+
+It is recommented to use [Spark Kerberos 
Keytab](https://spark.apache.org/docs/latest/security.html#using-a-keytab).
+
+{{< /tab >}}
+
+{{< tab "Hive" >}}
+
+An intuitive approach is to configure Hive's kerberos authentication.
+
+{{< /tab >}}
+
+{{< tab "Trino/JavaAPI" >}}
+
+Configure the following three options in your catalog configuration:
+
+- security.kerberos.login.keytabs: Absolute path to a Kerberos keytab file 
that contains the user credentials.
+  Please make sure it is copied to each machine.
+- security.kerberos.login.principal: Kerberos principal name associated with 
the keytab.
+- security.kerberos.login.use-ticket-cache: True or false, indicates whether 
to read from your Kerberos ticket cache.
+
+For JavaAPI:
+```
+SecurityContext.install(catalogOptions);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
 ## HDFS HA
 
 Ensure that `hdfs-site.xml` and `core-site.xml` contain the necessary [HA 
configuration](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html).
@@ -84,7 +124,3 @@ Ensure that `hdfs-site.xml` and `core-site.xml` contain the 
necessary [HA config
 ## HDFS ViewFS
 
 Ensure that `hdfs-site.xml` and `core-site.xml` contain the necessary [ViewFs 
configuration](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/ViewFs.html).
-
-## Kerberos
-
-Ensure that `hdfs-site.xml` and `core-site.xml` contain the necessary 
[Kerberos 
configuration](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html).
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/security/HadoopModule.java 
b/paimon-common/src/main/java/org/apache/paimon/security/HadoopModule.java
new file mode 100644
index 000000000..2ac0e43da
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/security/HadoopModule.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.security;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+import static 
org.apache.paimon.security.KerberosLoginProvider.hasUserKerberosAuthMethod;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Responsible for installing a Hadoop login user. */
+public class HadoopModule {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HadoopModule.class);
+
+    private final SecurityConfiguration securityConfig;
+
+    private final Configuration hadoopConfiguration;
+
+    public HadoopModule(
+            SecurityConfiguration securityConfiguration, Configuration 
hadoopConfiguration) {
+        this.securityConfig = checkNotNull(securityConfiguration);
+        this.hadoopConfiguration = checkNotNull(hadoopConfiguration);
+    }
+
+    public void install() throws IOException {
+
+        UserGroupInformation.setConfiguration(hadoopConfiguration);
+
+        UserGroupInformation loginUser;
+
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(securityConfig);
+        if (kerberosLoginProvider.isLoginPossible()) {
+            kerberosLoginProvider.doLogin();
+            loginUser = UserGroupInformation.getLoginUser();
+
+            if (loginUser.isFromKeytab()) {
+                String fileLocation =
+                        
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+                if (fileLocation != null) {
+                    Credentials credentials =
+                            Credentials.readTokenStorageFile(
+                                    new File(fileLocation), 
hadoopConfiguration);
+                    loginUser.addCredentials(credentials);
+                }
+            }
+        } else {
+            loginUser = UserGroupInformation.getLoginUser();
+        }
+
+        LOG.info("Hadoop user set to {}", loginUser);
+        boolean isKerberosSecurityEnabled = 
hasUserKerberosAuthMethod(loginUser);
+        LOG.info("Kerberos security is {}.", isKerberosSecurityEnabled ? 
"enabled" : "disabled");
+        if (isKerberosSecurityEnabled) {
+            LOG.info(
+                    "Kerberos credentials are {}.",
+                    loginUser.hasKerberosCredentials() ? "valid" : "invalid");
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/security/HadoopSecurityContext.java
 
b/paimon-common/src/main/java/org/apache/paimon/security/HadoopSecurityContext.java
new file mode 100644
index 000000000..4d7ac1f07
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/security/HadoopSecurityContext.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.security;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+
+/**
+ * Hadoop security context which runs a Callable with the previously 
initialized UGI and appropriate
+ * security credentials.
+ */
+public class HadoopSecurityContext {
+
+    private final UserGroupInformation ugi;
+
+    public HadoopSecurityContext() throws IOException {
+        this.ugi = UserGroupInformation.getLoginUser();
+    }
+
+    public <T> T runSecured(final Callable<T> securedCallable) throws 
Exception {
+        return ugi.doAs((PrivilegedExceptionAction<T>) securedCallable::call);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/security/KerberosLoginProvider.java
 
b/paimon-common/src/main/java/org/apache/paimon/security/KerberosLoginProvider.java
new file mode 100644
index 000000000..0360b6c19
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/security/KerberosLoginProvider.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.security;
+
+import org.apache.paimon.options.Options;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** Provides Kerberos login functionality. */
+public class KerberosLoginProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KerberosLoginProvider.class);
+
+    private final String principal;
+
+    private final String keytab;
+
+    private final boolean useTicketCache;
+
+    public KerberosLoginProvider(Options options) {
+        checkNotNull(options, "options must not be null");
+        SecurityConfiguration securityConfiguration = new 
SecurityConfiguration(options);
+        this.principal = securityConfiguration.getPrincipal();
+        this.keytab = securityConfiguration.getKeytab();
+        this.useTicketCache = securityConfiguration.useTicketCache();
+    }
+
+    public KerberosLoginProvider(SecurityConfiguration config) {
+        checkNotNull(config, "SecurityConfiguration must not be null");
+        this.principal = config.getPrincipal();
+        this.keytab = config.getKeytab();
+        this.useTicketCache = config.useTicketCache();
+    }
+
+    public boolean isLoginPossible() throws IOException {
+        if (UserGroupInformation.isSecurityEnabled()) {
+            LOG.debug("Security is enabled");
+        } else {
+            LOG.debug("Security is NOT enabled");
+            return false;
+        }
+
+        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
+
+        if (principal != null) {
+            LOG.debug("Login from keytab is possible");
+            return true;
+        } else if (!isProxyUser(currentUser)) {
+            if (useTicketCache && currentUser.hasKerberosCredentials()) {
+                LOG.debug("Login from ticket cache is possible");
+                return true;
+            }
+        } else {
+            throwProxyUserNotSupported();
+        }
+
+        LOG.debug("Login is NOT possible");
+
+        return false;
+    }
+
+    /**
+     * Does kerberos login and sets current user. Must be called when 
isLoginPossible returns true.
+     */
+    public void doLogin() throws IOException {
+        if (principal != null) {
+            LOG.info(
+                    "Attempting to login to KDC using principal: {} keytab: 
{}", principal, keytab);
+            UserGroupInformation.loginUserFromKeytab(principal, keytab);
+            LOG.info("Successfully logged into KDC");
+        } else if (!isProxyUser(UserGroupInformation.getCurrentUser())) {
+            LOG.info("Attempting to load user's ticket cache");
+            UserGroupInformation.loginUserFromSubject(null);
+            LOG.info("Loaded user's ticket cache successfully");
+        } else {
+            throwProxyUserNotSupported();
+        }
+    }
+
+    private void throwProxyUserNotSupported() {
+        throw new UnsupportedOperationException("Proxy user is not supported");
+    }
+
+    public static boolean isProxyUser(UserGroupInformation ugi) {
+        return ugi.getAuthenticationMethod() == 
UserGroupInformation.AuthenticationMethod.PROXY;
+    }
+
+    public static boolean hasUserKerberosAuthMethod(UserGroupInformation ugi) {
+        return UserGroupInformation.isSecurityEnabled()
+                && ugi.getAuthenticationMethod()
+                        == UserGroupInformation.AuthenticationMethod.KERBEROS;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/security/SecurityConfiguration.java
 
b/paimon-common/src/main/java/org/apache/paimon/security/SecurityConfiguration.java
new file mode 100644
index 000000000..aa1b4978c
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/security/SecurityConfiguration.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.security;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.StringUtils;
+
+import java.io.File;
+
+import static org.apache.paimon.options.ConfigOptions.key;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/** The security configuration. */
+public class SecurityConfiguration {
+
+    public static final ConfigOption<String> KERBEROS_LOGIN_KEYTAB =
+            key("security.kerberos.login.keytab")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDeprecatedKeys("security.keytab")
+                    .withDescription(
+                            "Absolute path to a Kerberos keytab file that 
contains the user credentials.");
+
+    public static final ConfigOption<String> KERBEROS_LOGIN_PRINCIPAL =
+            key("security.kerberos.login.principal")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDeprecatedKeys("security.principal")
+                    .withDescription("Kerberos principal name associated with 
the keytab.");
+
+    public static final ConfigOption<Boolean> KERBEROS_LOGIN_USETICKETCACHE =
+            key("security.kerberos.login.use-ticket-cache")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Indicates whether to read from your 
Kerberos ticket cache.");
+
+    private final Options options;
+
+    private final boolean useTicketCache;
+
+    private final String keytab;
+
+    private final String principal;
+
+    public SecurityConfiguration(Options options) {
+        this.options = checkNotNull(options);
+        this.keytab = options.get(KERBEROS_LOGIN_KEYTAB);
+        this.principal = options.get(KERBEROS_LOGIN_PRINCIPAL);
+        this.useTicketCache = options.get(KERBEROS_LOGIN_USETICKETCACHE);
+    }
+
+    public String getKeytab() {
+        return keytab;
+    }
+
+    public String getPrincipal() {
+        return principal;
+    }
+
+    public boolean useTicketCache() {
+        return useTicketCache;
+    }
+
+    public Options getOptions() {
+        return options;
+    }
+
+    public boolean isLegal() {
+        if (StringUtils.isBlank(keytab) != StringUtils.isBlank(principal)) {
+            return false;
+        }
+
+        if (!StringUtils.isBlank(keytab)) {
+            File keytabFile = new File(keytab);
+            return keytabFile.exists() && keytabFile.isFile() && 
keytabFile.canRead();
+        }
+
+        return true;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/security/SecurityContext.java 
b/paimon-common/src/main/java/org/apache/paimon/security/SecurityContext.java
new file mode 100644
index 000000000..cfe817153
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/security/SecurityContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.security;
+
+import org.apache.paimon.annotation.Public;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.HadoopUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Security context that provides security module install and holds the 
security context.
+ *
+ * @since 0.4.0
+ */
+@Public
+public class SecurityContext {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SecurityContext.class);
+
+    private static HadoopSecurityContext installedContext;
+
+    /** Installs security configuration. */
+    public static void install(Options options) throws Exception {
+        SecurityConfiguration config = new SecurityConfiguration(options);
+        if (config.isLegal()) {
+            HadoopModule module = createModule(config);
+            if (module != null) {
+                module.install();
+                installedContext = new HadoopSecurityContext();
+            }
+        }
+    }
+
+    /** Run with installed context. */
+    public static <T> T runSecured(final Callable<T> securedCallable) throws 
Exception {
+        return installedContext != null
+                ? installedContext.runSecured(securedCallable)
+                : securedCallable.call();
+    }
+
+    private static HadoopModule createModule(SecurityConfiguration 
securityConfig) {
+        // First check if we have Hadoop in the ClassPath. If not, we simply 
don't do anything.
+        if (!isHadoopCommonOnClasspath(HadoopModule.class.getClassLoader())) {
+            LOG.info(
+                    "Cannot create Hadoop Security Module because Hadoop 
cannot be found in the Classpath.");
+            return null;
+        }
+
+        try {
+            Configuration hadoopConfiguration =
+                    
HadoopUtils.getHadoopConfiguration(securityConfig.getOptions());
+            return new HadoopModule(securityConfig, hadoopConfiguration);
+        } catch (LinkageError e) {
+            LOG.warn(
+                    "Cannot create Hadoop Security Module due to an error that 
happened while instantiating the module. No security module will be loaded.",
+                    e);
+            return null;
+        }
+    }
+
+    public static boolean isHadoopCommonOnClasspath(ClassLoader classLoader) {
+        try {
+            LOG.debug("Checking whether hadoop common dependency in on 
classpath.");
+            Class.forName("org.apache.hadoop.conf.Configuration", false, 
classLoader);
+            Class.forName("org.apache.hadoop.security.UserGroupInformation", 
false, classLoader);
+            LOG.debug("Hadoop common dependency found on classpath.");
+            return true;
+        } catch (ClassNotFoundException e) {
+            LOG.debug("Hadoop common dependency cannot be found on 
classpath.");
+            return false;
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 5cf7f2085..745a1c75a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -522,4 +522,17 @@ public class StringUtils {
         }
         return buf.toString();
     }
+
+    public static boolean isBlank(String str) {
+        int strLen;
+        if (str == null || (strLen = str.length()) == 0) {
+            return true;
+        }
+        for (int i = 0; i < strLen; i++) {
+            if ((!Character.isWhitespace(str.charAt(i)))) {
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/security/KerberosLoginProviderITCase.java
 
b/paimon-common/src/test/java/org/apache/paimon/security/KerberosLoginProviderITCase.java
new file mode 100644
index 000000000..6c671745d
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/security/KerberosLoginProviderITCase.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.security;
+
+import org.apache.paimon.options.Options;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static 
org.apache.paimon.security.SecurityConfiguration.KERBEROS_LOGIN_KEYTAB;
+import static 
org.apache.paimon.security.SecurityConfiguration.KERBEROS_LOGIN_PRINCIPAL;
+import static 
org.apache.paimon.security.SecurityConfiguration.KERBEROS_LOGIN_USETICKETCACHE;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link KerberosLoginProvider}.
+ *
+ * <p>This class is an ITCase because the mocking breaks the {@link 
UserGroupInformation} class for
+ * other tests.
+ */
+public class KerberosLoginProviderITCase {
+
+    @Test
+    public void isLoginPossibleMustReturnFalseByDefault() throws IOException {
+        Options options = new Options();
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertFalse(kerberosLoginProvider.isLoginPossible());
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustReturnFalseWithNonKerberos() throws 
IOException {
+        Options options = new Options();
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            
ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(false);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertFalse(kerberosLoginProvider.isLoginPossible());
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustReturnTrueWithKeytab(@TempDir Path tmpDir) 
throws IOException {
+        Options options = new Options();
+        options.set(KERBEROS_LOGIN_PRINCIPAL, "principal");
+        final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+        options.set(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertTrue(kerberosLoginProvider.isLoginPossible());
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustReturnTrueWithTGT() throws IOException {
+        Options options = new Options();
+        options.set(KERBEROS_LOGIN_USETICKETCACHE, true);
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            
when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);
+            ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertTrue(kerberosLoginProvider.isLoginPossible());
+        }
+    }
+
+    @Test
+    public void isLoginPossibleMustThrowExceptionWithProxyUser() {
+        Options options = new Options();
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    
.thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+            ugi.when(UserGroupInformation::isSecurityEnabled).thenReturn(true);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertThrows(
+                    UnsupportedOperationException.class, 
kerberosLoginProvider::isLoginPossible);
+        }
+    }
+
+    @Test
+    public void doLoginMustLoginWithKeytab(@TempDir Path tmpDir) throws 
IOException {
+        Options options = new Options();
+        options.set(KERBEROS_LOGIN_PRINCIPAL, "principal");
+        final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab"));
+        options.set(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString());
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            kerberosLoginProvider.doLogin();
+            ugi.verify(() -> 
UserGroupInformation.loginUserFromKeytab(anyString(), anyString()));
+        }
+    }
+
+    @Test
+    public void doLoginMustLoginWithTGT() throws IOException {
+        Options options = new Options();
+        options.set(KERBEROS_LOGIN_USETICKETCACHE, true);
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            
when(userGroupInformation.hasKerberosCredentials()).thenReturn(true);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            kerberosLoginProvider.doLogin();
+            ugi.verify(() -> UserGroupInformation.loginUserFromSubject(null));
+        }
+    }
+
+    @Test
+    public void doLoginMustThrowExceptionWithProxyUser() {
+        Options options = new Options();
+        KerberosLoginProvider kerberosLoginProvider = new 
KerberosLoginProvider(options);
+
+        try (MockedStatic<UserGroupInformation> ugi = 
mockStatic(UserGroupInformation.class)) {
+            UserGroupInformation userGroupInformation = 
mock(UserGroupInformation.class);
+            when(userGroupInformation.getAuthenticationMethod())
+                    
.thenReturn(UserGroupInformation.AuthenticationMethod.PROXY);
+            
ugi.when(UserGroupInformation::getCurrentUser).thenReturn(userGroupInformation);
+
+            assertThrows(UnsupportedOperationException.class, 
kerberosLoginProvider::doLogin);
+        }
+    }
+}

Reply via email to