Copilot commented on code in PR #9565:
URL: https://github.com/apache/gravitino/pull/9565#discussion_r2652101491


##########
clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java:
##########
@@ -196,10 +237,20 @@ public Builder withKeyTabFile(File file) {
      *
      * @return The built KerberosTokenProvider instance.
      */
-    @SuppressWarnings("null")
+    @SuppressWarnings("removal")
     public KerberosTokenProvider build() {
       KerberosTokenProvider provider = new KerberosTokenProvider();
 
+      java.security.AccessControlContext context = 
java.security.AccessController.getContext();
+      Subject subject = Subject.getSubject(context);
+      if (subject != null
+          && (!subject.getPrivateCredentials(KerberosKey.class).isEmpty()
+              || 
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty())) {
+        provider.subjectProvider = new ExistingSubjectProvider(subject);
+        provider.clientPrincipal = extractPrincipalFromSubject(subject);

Review Comment:
   The extractPrincipalFromSubject method can return null if no 
KerberosPrincipal is found in the subject. This null value is then assigned to 
provider.clientPrincipal without validation. Later in getTokenInternal (line 
88), this clientPrincipal is split without null checking, which could lead to a 
NullPointerException. Add validation to ensure a principal is found before 
using the ExistingSubjectProvider.
   ```suggestion
           String subjectPrincipal = extractPrincipalFromSubject(subject);
           Preconditions.checkArgument(
               StringUtils.isNotBlank(subjectPrincipal),
               "KerberosTokenProvider must extract a non-null KerberosPrincipal 
from the Subject");
           Preconditions.checkArgument(
               Splitter.on('@').splitToList(subjectPrincipal).size() == 2,
               "Principal has the wrong format");
           provider.subjectProvider = new ExistingSubjectProvider(subject);
           provider.clientPrincipal = subjectPrincipal;
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java:
##########
@@ -46,4 +46,12 @@ private GravitinoCatalogStoreFactoryOptions() {}
           .mapType()
           .defaultValue(ImmutableMap.of())
           .withDescription("The config of Gravitino client");
+
+  public static final String AUTH_TYPE = "gravitino.client.auth.type";
+
+  // OAuth2 config keys
+  public static final String OAUTH2_SERVER_URI = 
"gravitino.client.oauth2.serverUri";
+  public static final String OAUTH2_CREDENTIAL = 
"gravitino.client.oauth2.credential";
+  public static final String OAUTH2_PATH = "gravitino.client.oauth2.path";

Review Comment:
   The property name in the code is 'OAUTH2_PATH' but the documentation refers 
to it as 'gravitino.client.oauth2.tokenPath'. This creates an inconsistency. 
The constant should match the documented property name, or the documentation 
should be updated to match the code. Based on the semantic meaning (it's a path 
to the token endpoint), 'tokenPath' is more descriptive than just 'path'.
   ```suggestion
     public static final String OAUTH2_TOKEN_PATH = 
"gravitino.client.oauth2.tokenPath";
   ```



##########
docs/flink-connector/flink-authentication-with-gravitino.md:
##########
@@ -0,0 +1,37 @@
+---
+title: "Flink authentication with Gravitino server"
+slug: /flink-connector/flink-authentication
+keyword: flink connector authentication oauth2 kerberos
+license: "This software is licensed under the Apache License version 2."
+---
+
+## Overview
+
+Flink connector supports `simple`, `oauth2`, and `kerberos` authentication 
when accessing the Gravitino server.
+
+| Property                   | Type   | Default Value | Description            
                                                                                
         | Required | Since Version |
+|----------------------------|--------|---------------|-----------------------------------------------------------------------------------------------------------------|----------|---------------|
+| gravitino.client.auth.type | string | (none)        | Only `oauth` is 
supported; leave unset to use Flink Kerberos settings (or simple auth if 
security is disabled). | No       | 1.2.0         |

Review Comment:
   The documentation table says "Only `oauth` is supported" but the overview 
mentions that `simple`, `oauth2`, and `kerberos` authentication are all 
supported. This is contradictory. The description should clarify that when 
gravitino.client.auth.type is explicitly set, only 'oauth' is supported as a 
value, while Kerberos and simple authentication are automatically selected 
based on Flink's security settings when the property is not set.
   ```suggestion
   | Property                   | Type   | Default Value | Description          
                                                                                
                                    | Required | Since Version |
   
|----------------------------|--------|---------------|------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
   | gravitino.client.auth.type | string | (none)        | When explicitly set, 
only `oauth` is supported. If unset, Flink selects Kerberos or simple 
authentication based on its security settings. | No       | 1.2.0         |
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java:
##########
@@ -206,4 +240,58 @@ private static boolean checkEqual(
         && gravitinoCatalogManager.metalakeName.equals(metalakeName)
         && 
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
   }
+
+  private static GravitinoAdminClient buildOAuthClient(
+      String gravitinoUri, Map<String, String> config) {
+    String serverUri = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI);
+    String credential = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
+    String path = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_PATH);
+    String scope = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
+    Preconditions.checkArgument(
+        StringUtils.isNoneBlank(serverUri, credential, path, scope),
+        "OAuth2 auth requires serverUri, credential, path and scope");
+
+    DefaultOAuth2TokenProvider provider =
+        DefaultOAuth2TokenProvider.builder()
+            .withUri(serverUri)
+            .withCredential(credential)
+            .withPath(path)
+            .withScope(scope)
+            .build();
+
+    return GravitinoAdminClient.builder(gravitinoUri)
+        .withOAuth(provider)
+        .withClientConfig(config)
+        .build();
+  }

Review Comment:
   The OAuth2 authentication path in buildOAuthClient is not covered by tests. 
While Kerberos authentication has comprehensive test coverage through 
FlinkHiveKerberosClientIT, OAuth2 authentication should also have integration 
tests to ensure the configuration and authentication flow work correctly.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java:
##########
@@ -206,4 +240,58 @@ private static boolean checkEqual(
         && gravitinoCatalogManager.metalakeName.equals(metalakeName)
         && 
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
   }
+
+  private static GravitinoAdminClient buildOAuthClient(
+      String gravitinoUri, Map<String, String> config) {
+    String serverUri = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI);
+    String credential = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
+    String path = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_PATH);
+    String scope = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
+    Preconditions.checkArgument(
+        StringUtils.isNoneBlank(serverUri, credential, path, scope),
+        "OAuth2 auth requires serverUri, credential, path and scope");

Review Comment:
   The error message states "OAuth2 auth requires serverUri, credential, path 
and scope" but uses generic parameter names that don't clearly indicate these 
are OAuth2-specific properties. Consider making the error message more explicit 
by including the full configuration key names (e.g., "OAuth2 authentication 
requires: gravitino.client.oauth2.serverUri, 
gravitino.client.oauth2.credential, gravitino.client.oauth2.path, and 
gravitino.client.oauth2.scope") to help users identify which properties are 
missing.
   ```suggestion
           "OAuth2 authentication requires: gravitino.client.oauth2.serverUri, "
               + "gravitino.client.oauth2.credential, 
gravitino.client.oauth2.path, "
               + "and gravitino.client.oauth2.scope");
   ```



##########
clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java:
##########
@@ -155,6 +135,67 @@ void setHost(String host) {
     this.host = host;
   }
 
+  private interface SubjectProvider {
+    Subject get() throws LoginException;
+
+    void close() throws LoginException;
+  }
+
+  private static final class ExistingSubjectProvider implements 
SubjectProvider {
+    private final Subject subject;
+
+    ExistingSubjectProvider(Subject subject) {
+      this.subject = subject;
+    }
+
+    @Override
+    public Subject get() {
+      return subject;
+    }
+
+    @Override
+    public void close() {
+      // no-op
+    }
+  }
+
+  private static final class LoginSubjectProvider implements SubjectProvider {
+    private final String principal;
+    private final String keytabFile;
+    private LoginContext loginContext;
+
+    LoginSubjectProvider(String principal, String keytabFile) {
+      this.principal = principal;
+      this.keytabFile = keytabFile;
+    }
+
+    @Override
+    public synchronized Subject get() throws LoginException {
+      if (loginContext == null) {
+        loginContext = KerberosUtils.login(principal, keytabFile);
+      } else if (keytabFile != null && isLoginTicketExpired(loginContext)) {
+        loginContext.logout();
+        loginContext = KerberosUtils.login(principal, keytabFile);
+      }
+      return loginContext.getSubject();
+    }
+
+    @Override
+    public void close() throws LoginException {
+      if (loginContext != null) {
+        loginContext.logout();
+      }
+    }
+
+    private boolean isLoginTicketExpired(LoginContext ctx) {
+      Set<KerberosTicket> tickets = 
ctx.getSubject().getPrivateCredentials(KerberosTicket.class);
+      if (tickets.isEmpty()) {
+        return false;
+      }
+      return 
tickets.iterator().next().getEndTime().toInstant().isBefore(Instant.now());

Review Comment:
   The isLoginTicketExpired method only checks the first ticket from the 
iterator. If there are multiple Kerberos tickets in the subject's private 
credentials, this might not accurately represent the overall expiry state. 
Consider checking all tickets or documenting why only the first ticket is 
sufficient.
   ```suggestion
         Instant now = Instant.now();
         for (KerberosTicket ticket : tickets) {
           if (!ticket.getEndTime().toInstant().isBefore(now)) {
             // At least one ticket is still valid.
             return false;
           }
         }
         // All tickets are expired.
         return true;
   ```



##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.hive;
+
+import static org.apache.gravitino.server.authentication.KerberosConfig.KEYTAB;
+import static 
org.apache.gravitino.server.authentication.KerberosConfig.PRINCIPAL;
+import static org.apache.hadoop.minikdc.MiniKdc.MAX_TICKET_LIFETIME;
+
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
+import 
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT;
+import 
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for creating Gravitino Hive catalogs with Kerberos 
authentication. This test
+ * verifies that catalog creation works correctly when the Gravitino server is 
configured with
+ * Kerberos authentication. The test extends FlinkEnvIT directly to keep the 
test scope focused on
+ * Kerberos-specific scenarios.
+ */
+@Tag("gravitino-docker-test")
+public class FlinkHiveKerberosClientIT extends FlinkEnvIT {
+
+  private static final KerberosSecurityTestcase kdc =
+      new KerberosSecurityTestcase() {
+        @Override
+        public void createMiniKdcConf() {
+          super.createMiniKdcConf();

Review Comment:
   The MAX_TICKET_LIFETIME is set to "5" seconds in the test configuration 
(line 69), which is very short. While this might be intentional to test ticket 
expiration, it could lead to flaky tests if operations take longer than 
expected. Consider documenting why such a short lifetime is used or increasing 
it to a more reasonable value if rapid expiration testing is not the goal.
   ```suggestion
             super.createMiniKdcConf();
             // Use a very short ticket lifetime to speed up Kerberos 
expiration in tests.
             // The test operations are executed immediately after the MiniKdc 
is started and
             // are not long-running, so a 5-second lifetime is sufficient and 
keeps the test
             // fast. If this test ever becomes flaky on slower environments or 
CI systems,
             // consider increasing this value to a more forgiving lifetime 
(for example, 60s).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to