This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 38c420d8c7 [#9564] fix(authn): Fix Flink connector oauth2 mode (#9618)
38c420d8c7 is described below
commit 38c420d8c7bde4080d9a9bd2e7e0b83d48347044
Author: roryqi <[email protected]>
AuthorDate: Thu Jan 8 16:51:52 2026 +0800
[#9564] fix(authn): Fix Flink connector oauth2 mode (#9618)
### What changes were proposed in this pull request?
Fix Flink connector oauth2 mode.
### Why are the changes needed?
This is a follow up PR.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By hand.
---
.../flink-authentication-with-gravitino.md | 2 +-
.../connector/catalog/GravitinoCatalogManager.java | 26 +++++++++++++++-------
.../store/GravitinoCatalogStoreFactoryOptions.java | 1 +
3 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/docs/flink-connector/flink-authentication-with-gravitino.md
b/docs/flink-connector/flink-authentication-with-gravitino.md
index da5c099fd3..93455a12a0 100644
--- a/docs/flink-connector/flink-authentication-with-gravitino.md
+++ b/docs/flink-connector/flink-authentication-with-gravitino.md
@@ -39,7 +39,7 @@ table.catalog-store.gravitino.gravitino.metalake: my_metalake
table.catalog-store.gravitino.gravitino.client.auth.type: oauth2
table.catalog-store.gravitino.gravitino.client.oauth2.serverUri:
https://oauth-server.example.com
table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath: /oauth/token
-table.catalog-store.gravitino.gravitino.client.oauth2.credential:
your-client-credentials
+table.catalog-store.gravitino.gravitino.client.oauth2.credential:
client-id:client-secret
table.catalog-store.gravitino.gravitino.client.oauth2.scope: your-scope
```
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
index bd355ab6f5..7eeb4fab71 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
@@ -25,9 +25,7 @@ import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
-import org.apache.gravitino.auth.AuthenticatorType;
import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.client.GravitinoMetalake;
@@ -66,7 +64,7 @@ public class GravitinoCatalogManager {
// Only OAuth is explicitly configured; otherwise follow Flink security
(Kerberos if enabled,
// simple auth otherwise).
- if (AuthenticatorType.OAUTH.name().equalsIgnoreCase(authType)) {
+ if (GravitinoCatalogStoreFactoryOptions.OAUTH2.equalsIgnoreCase(authType))
{
this.gravitinoClient = buildOAuthClient(gravitinoUri,
gravitinoClientConfig);
} else {
if (authType != null) {
@@ -76,6 +74,11 @@ public class GravitinoCatalogManager {
authType, GravitinoCatalogStoreFactoryOptions.AUTH_TYPE));
}
+ LOG.info(
+ "Flink security enabled: {}, Current user: {}",
+ UserGroupInformation.isSecurityEnabled(),
+ getUgi().getUserName());
+
if (UserGroupInformation.isSecurityEnabled()) {
if (getUgi().getAuthenticationMethod()
!= UserGroupInformation.AuthenticationMethod.KERBEROS) {
@@ -247,14 +250,21 @@ public class GravitinoCatalogManager {
String credential =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
String path =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH);
String scope =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
- Preconditions.checkArgument(
- StringUtils.isNoneBlank(serverUri, credential, path, scope),
- String.format(
- "OAuth2 authentication requires: %s, %s, %s, and %s",
+
+ // Remove OAuth-specific config entries from the client config map. These
keys are only
+ // used to construct the OAuth2 token provider and are not valid
GravitinoAdminClient
+ // client configuration options; passing them to withClientConfig() could
cause validation
+ // errors or other unexpected behavior.
+ Set<String> oauthConfigKeys =
+ Sets.newHashSet(
+ GravitinoCatalogStoreFactoryOptions.AUTH_TYPE,
GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI,
GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL,
GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH,
- GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE));
+ GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
+ for (String key : oauthConfigKeys) {
+ config.remove(key);
+ }
DefaultOAuth2TokenProvider provider =
DefaultOAuth2TokenProvider.builder()
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
index f989c3d138..cda4297881 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
@@ -48,6 +48,7 @@ public class GravitinoCatalogStoreFactoryOptions {
.withDescription("The config of Gravitino client");
public static final String AUTH_TYPE = "gravitino.client.auth.type";
+ public static final String OAUTH2 = "oauth2";
// OAuth2 config keys
public static final String OAUTH2_SERVER_URI =
"gravitino.client.oauth2.serverUri";