Copilot commented on code in PR #9565:
URL: https://github.com/apache/gravitino/pull/9565#discussion_r2652101491
##########
clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java:
##########
@@ -196,10 +237,20 @@ public Builder withKeyTabFile(File file) {
*
* @return The built KerberosTokenProvider instance.
*/
- @SuppressWarnings("null")
+ @SuppressWarnings("removal")
public KerberosTokenProvider build() {
KerberosTokenProvider provider = new KerberosTokenProvider();
+ java.security.AccessControlContext context =
java.security.AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+ if (subject != null
+ && (!subject.getPrivateCredentials(KerberosKey.class).isEmpty()
+ ||
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty())) {
+ provider.subjectProvider = new ExistingSubjectProvider(subject);
+ provider.clientPrincipal = extractPrincipalFromSubject(subject);
Review Comment:
The extractPrincipalFromSubject method can return null if no
KerberosPrincipal is found in the subject. This null value is then assigned to
provider.clientPrincipal without validation. Later in getTokenInternal (line
88), this clientPrincipal is split without null checking, which could lead to a
NullPointerException. Add validation to ensure a principal is found before
using the ExistingSubjectProvider.
```suggestion
String subjectPrincipal = extractPrincipalFromSubject(subject);
Preconditions.checkArgument(
StringUtils.isNotBlank(subjectPrincipal),
"KerberosTokenProvider must extract a non-null KerberosPrincipal
from the Subject");
Preconditions.checkArgument(
Splitter.on('@').splitToList(subjectPrincipal).size() == 2,
"Principal has the wrong format");
provider.subjectProvider = new ExistingSubjectProvider(subject);
provider.clientPrincipal = subjectPrincipal;
```
##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java:
##########
@@ -46,4 +46,12 @@ private GravitinoCatalogStoreFactoryOptions() {}
.mapType()
.defaultValue(ImmutableMap.of())
.withDescription("The config of Gravitino client");
+
+ public static final String AUTH_TYPE = "gravitino.client.auth.type";
+
+ // OAuth2 config keys
+ public static final String OAUTH2_SERVER_URI =
"gravitino.client.oauth2.serverUri";
+ public static final String OAUTH2_CREDENTIAL =
"gravitino.client.oauth2.credential";
+ public static final String OAUTH2_PATH = "gravitino.client.oauth2.path";
Review Comment:
The property name in the code is 'OAUTH2_PATH' but the documentation refers
to it as 'gravitino.client.oauth2.tokenPath'. This creates an inconsistency.
The constant should match the documented property name, or the documentation
should be updated to match the code. Based on the semantic meaning (it's a path
to the token endpoint), 'tokenPath' is more descriptive than just 'path'.
```suggestion
public static final String OAUTH2_TOKEN_PATH =
"gravitino.client.oauth2.tokenPath";
```
##########
docs/flink-connector/flink-authentication-with-gravitino.md:
##########
@@ -0,0 +1,37 @@
+---
+title: "Flink authentication with Gravitino server"
+slug: /flink-connector/flink-authentication
+keyword: flink connector authentication oauth2 kerberos
+license: "This software is licensed under the Apache License version 2."
+---
+
+## Overview
+
+Flink connector supports `simple`, `oauth2`, and `kerberos` authentication
when accessing the Gravitino server.
+
+| Property | Type | Default Value | Description
| Required | Since Version |
+|----------------------------|--------|---------------|-----------------------------------------------------------------------------------------------------------------|----------|---------------|
+| gravitino.client.auth.type | string | (none) | Only `oauth` is
supported; leave unset to use Flink Kerberos settings (or simple auth if
security is disabled). | No | 1.2.0 |
Review Comment:
The documentation table says "Only `oauth` is supported" but the overview
mentions that `simple`, `oauth2`, and `kerberos` authentication are all
supported. This is contradictory. The description should clarify that when
gravitino.client.auth.type is explicitly set, only 'oauth' is supported as a
value, while Kerberos and simple authentication are automatically selected
based on Flink's security settings when the property is not set.
```suggestion
| Property | Type | Default Value | Description
| Required | Since Version |
|----------------------------|--------|---------------|------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| gravitino.client.auth.type | string | (none) | When explicitly set,
only `oauth` is supported. If unset, Flink selects Kerberos or simple
authentication based on its security settings. | No | 1.2.0 |
```
##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java:
##########
@@ -206,4 +240,58 @@ private static boolean checkEqual(
&& gravitinoCatalogManager.metalakeName.equals(metalakeName)
&&
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
}
+
+ private static GravitinoAdminClient buildOAuthClient(
+ String gravitinoUri, Map<String, String> config) {
+ String serverUri =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI);
+ String credential =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
+ String path = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_PATH);
+ String scope =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
+ Preconditions.checkArgument(
+ StringUtils.isNoneBlank(serverUri, credential, path, scope),
+ "OAuth2 auth requires serverUri, credential, path and scope");
+
+ DefaultOAuth2TokenProvider provider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(serverUri)
+ .withCredential(credential)
+ .withPath(path)
+ .withScope(scope)
+ .build();
+
+ return GravitinoAdminClient.builder(gravitinoUri)
+ .withOAuth(provider)
+ .withClientConfig(config)
+ .build();
+ }
Review Comment:
The OAuth2 authentication path in buildOAuthClient is not covered by tests.
While Kerberos authentication has comprehensive test coverage through
FlinkHiveKerberosClientIT, OAuth2 authentication should also have integration
tests to ensure the configuration and authentication flow work correctly.
##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java:
##########
@@ -206,4 +240,58 @@ private static boolean checkEqual(
&& gravitinoCatalogManager.metalakeName.equals(metalakeName)
&&
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
}
+
+ private static GravitinoAdminClient buildOAuthClient(
+ String gravitinoUri, Map<String, String> config) {
+ String serverUri =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI);
+ String credential =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
+ String path = config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_PATH);
+ String scope =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
+ Preconditions.checkArgument(
+ StringUtils.isNoneBlank(serverUri, credential, path, scope),
+ "OAuth2 auth requires serverUri, credential, path and scope");
Review Comment:
The error message states "OAuth2 auth requires serverUri, credential, path
and scope" but uses generic parameter names that don't clearly indicate these
are OAuth2-specific properties. Consider making the error message more explicit
by including the full configuration key names (e.g., "OAuth2 authentication
requires: gravitino.client.oauth2.serverUri,
gravitino.client.oauth2.credential, gravitino.client.oauth2.path, and
gravitino.client.oauth2.scope") to help users identify which properties are
missing.
```suggestion
"OAuth2 authentication requires: gravitino.client.oauth2.serverUri, "
+ "gravitino.client.oauth2.credential,
gravitino.client.oauth2.path, "
+ "and gravitino.client.oauth2.scope");
```
##########
clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java:
##########
@@ -155,6 +135,67 @@ void setHost(String host) {
this.host = host;
}
+ private interface SubjectProvider {
+ Subject get() throws LoginException;
+
+ void close() throws LoginException;
+ }
+
+ private static final class ExistingSubjectProvider implements
SubjectProvider {
+ private final Subject subject;
+
+ ExistingSubjectProvider(Subject subject) {
+ this.subject = subject;
+ }
+
+ @Override
+ public Subject get() {
+ return subject;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+ }
+
+ private static final class LoginSubjectProvider implements SubjectProvider {
+ private final String principal;
+ private final String keytabFile;
+ private LoginContext loginContext;
+
+ LoginSubjectProvider(String principal, String keytabFile) {
+ this.principal = principal;
+ this.keytabFile = keytabFile;
+ }
+
+ @Override
+ public synchronized Subject get() throws LoginException {
+ if (loginContext == null) {
+ loginContext = KerberosUtils.login(principal, keytabFile);
+ } else if (keytabFile != null && isLoginTicketExpired(loginContext)) {
+ loginContext.logout();
+ loginContext = KerberosUtils.login(principal, keytabFile);
+ }
+ return loginContext.getSubject();
+ }
+
+ @Override
+ public void close() throws LoginException {
+ if (loginContext != null) {
+ loginContext.logout();
+ }
+ }
+
+ private boolean isLoginTicketExpired(LoginContext ctx) {
+ Set<KerberosTicket> tickets =
ctx.getSubject().getPrivateCredentials(KerberosTicket.class);
+ if (tickets.isEmpty()) {
+ return false;
+ }
+ return
tickets.iterator().next().getEndTime().toInstant().isBefore(Instant.now());
Review Comment:
The isLoginTicketExpired method only checks the first ticket from the
iterator. If there are multiple Kerberos tickets in the subject's private
credentials, this might not accurately represent the overall expiry state.
Consider checking all tickets or documenting why only the first ticket is
sufficient.
```suggestion
Instant now = Instant.now();
for (KerberosTicket ticket : tickets) {
if (!ticket.getEndTime().toInstant().isBefore(now)) {
// At least one ticket is still valid.
return false;
}
}
// All tickets are expired.
return true;
```
##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java:
##########
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.hive;
+
+import static org.apache.gravitino.server.authentication.KerberosConfig.KEYTAB;
+import static
org.apache.gravitino.server.authentication.KerberosConfig.PRINCIPAL;
+import static org.apache.hadoop.minikdc.MiniKdc.MAX_TICKET_LIFETIME;
+
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
+import
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT;
+import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for creating Gravitino Hive catalogs with Kerberos
authentication. This test
+ * verifies that catalog creation works correctly when the Gravitino server is
configured with
+ * Kerberos authentication. The test extends FlinkEnvIT directly to keep the
test scope focused on
+ * Kerberos-specific scenarios.
+ */
+@Tag("gravitino-docker-test")
+public class FlinkHiveKerberosClientIT extends FlinkEnvIT {
+
+ private static final KerberosSecurityTestcase kdc =
+ new KerberosSecurityTestcase() {
+ @Override
+ public void createMiniKdcConf() {
+ super.createMiniKdcConf();
Review Comment:
The MAX_TICKET_LIFETIME is set to "5" seconds in the test configuration
(line 69), which is very short. While this might be intentional to test ticket
expiration, it could lead to flaky tests if operations take longer than
expected. Consider documenting why such a short lifetime is used or increasing
it to a more reasonable value if rapid expiration testing is not the goal.
```suggestion
super.createMiniKdcConf();
// Use a very short ticket lifetime to speed up Kerberos
expiration in tests.
// The test operations are executed immediately after the MiniKdc
is started and
// are not long-running, so a 5-second lifetime is sufficient and
keeps the test
// fast. If this test ever becomes flaky on slower environments or
CI systems,
// consider increasing this value to a more forgiving lifetime
(for example, 60s).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]