This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new a60b1995d [#5055] support spark authentication with Gravitino (#5040)
a60b1995d is described below

commit a60b1995d5644be589633a8eabea047017840b3a
Author: FANNG <[email protected]>
AuthorDate: Mon Oct 21 15:57:01 2024 +0800

    [#5055] support spark authentication with Gravitino (#5040)
    
    ### What changes were proposed in this pull request?
    support spark authentication with Gravitino,
    - simple
    - oauth2
    - kerberos
    
    ### Why are the changes needed?
    
    Fix: #5055
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    I setup related environment in local machine, test create table with 
coresponding user.
---
 .../org/apache/gravitino/auth/AuthProperties.java  | 57 +++++++++++++++++++
 .../spark-authentication-with-gravitino.md         | 39 +++++++++++++
 .../spark/connector/GravitinoSparkConfig.java      | 16 ++++++
 .../connector/catalog/GravitinoCatalogManager.java | 26 +++------
 .../connector/plugin/GravitinoDriverPlugin.java    | 65 +++++++++++++++++++++-
 5 files changed, 185 insertions(+), 18 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/auth/AuthProperties.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/auth/AuthProperties.java
new file mode 100644
index 000000000..2984ee2ba
--- /dev/null
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/auth/AuthProperties.java
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.auth;
+
+public class AuthProperties {
+
+  /** The configuration key for the Gravitino client auth type. */
+  public static final String GRAVITINO_CLIENT_AUTH_TYPE = "authType";
+
+  public static final String SIMPLE_AUTH_TYPE = "simple";
+  public static final String OAUTH2_AUTH_TYPE = "oauth2";
+  public static final String KERBEROS_AUTH_TYPE = "kerberos";
+
+  // oauth2
+  /** The configuration key for the URI of the default OAuth server. */
+  public static final String GRAVITINO_OAUTH2_SERVER_URI = "oauth2.serverUri";
+
+  /** The configuration key for the client credential. */
+  public static final String GRAVITINO_OAUTH2_CREDENTIAL = "oauth2.credential";
+
+  /** The configuration key for the path which to get the token. */
+  public static final String GRAVITINO_OAUTH2_TOKEN_PATH = "oauth2.tokenPath";
+
+  /** The configuration key for the scope of the token. */
+  public static final String GRAVITINO_OAUTH2_SCOPE = "oauth2.scope";
+
+  public static boolean isKerberos(String authType) {
+    return KERBEROS_AUTH_TYPE.equalsIgnoreCase(authType);
+  }
+
+  public static boolean isOAuth2(String authType) {
+    return OAUTH2_AUTH_TYPE.equalsIgnoreCase(authType);
+  }
+
+  public static boolean isSimple(String authType) {
+    return authType == null || SIMPLE_AUTH_TYPE.equalsIgnoreCase(authType);
+  }
+
+  private AuthProperties() {}
+}
diff --git a/docs/spark-connector/spark-authentication-with-gravitino.md 
b/docs/spark-connector/spark-authentication-with-gravitino.md
new file mode 100644
index 000000000..9ec4a949a
--- /dev/null
+++ b/docs/spark-connector/spark-authentication-with-gravitino.md
@@ -0,0 +1,39 @@
+---
+title: "Spark authentication with Gravitino server"
+slug: /spark-connector/spark-authentication
+keyword: spark connector authentication oauth2 kerberos
+license: "This software is licensed under the Apache License version 2."
+---
+
+## Overview
+
+Spark connector supports `simple` `oauth2` and `kerberos` authentication when 
accessing Gravitino server.
+
+| Property                     | Type   | Default Value | Description          
                                                                                
               | Required | Since Version    |
+|------------------------------|--------|---------------|---------------------------------------------------------------------------------------------------------------------|----------|------------------|
+| spark.sql.gravitino.authType | string | `simple`      | The authentication 
mechanisms when communicating with Gravitino server, supports `simple`, 
`oauth2` and `kerberos`. | No       | 0.7.0-incubating |
+
+## Simple mode
+
+In the simple mode, the username originates from Spark, and is obtained using 
the following sequences:
+1. The environment variable of `SPARK_USER`
+2. The environment variable of `HADOOP_USER_NAME`
+3. The user login in the machine
+
+## OAuth2 mode
+
+In the OAuth2 mode, you could use the following configuration to fetch an 
OAuth2 token to access Gravitino server.
+
+| Property                              | Type   | Default Value | Description 
                                  | Required             | Since Version    |
+|---------------------------------------|--------|---------------|-----------------------------------------------|----------------------|------------------|
+| spark.sql.gravitino.oauth2.serverUri  | string | None          | The OAuth2 
server uri address.                | Yes, for OAuth2 mode | 0.7.0-incubating |
+| spark.sql.gravitino.oauth2.tokenPath  | string | None          | The path of 
token interface in OAuth2 server. | Yes, for OAuth2 mode | 0.7.0-incubating |
+| spark.sql.gravitino.oauth2.credential | string | None          | The 
credential to request the OAuth2 token.   | Yes, for OAuth2 mode | 
0.7.0-incubating |
+| spark.sql.gravitino.oauth2.scope      | string | None          | The scope 
to request the OAuth2 token.        | Yes, for OAuth2 mode | 0.7.0-incubating |
+
+## Kerberos mode
+
+In kerberos mode, you could use the Spark kerberos configuration to fetch a 
kerberos ticket to access Gravitino server, use `spark.kerberos.principal`, 
`spark.kerberos.keytab` to specify kerberos principal and keytab.
+
+The principal of Gravitino server is like `HTTP/$host@$realm`, please keep the 
`$host` consistent with the host in Gravitino server uri address.
+Please make sure `krb5.conf` is accessible by Spark, like by specifying the 
configuration 
`spark.driver.extraJavaOptions="-Djava.security.krb5.conf=/xx/krb5.conf"`.
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
index 160e6f6ec..630ae5c33 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.spark.connector;
 
+import org.apache.gravitino.auth.AuthProperties;
+
 public class GravitinoSparkConfig {
 
   private static final String GRAVITINO_PREFIX = "spark.sql.gravitino.";
@@ -26,6 +28,20 @@ public class GravitinoSparkConfig {
   public static final String GRAVITINO_METALAKE = GRAVITINO_PREFIX + 
"metalake";
   public static final String GRAVITINO_ENABLE_ICEBERG_SUPPORT =
       GRAVITINO_PREFIX + "enableIcebergSupport";
+
+  public static final String GRAVITINO_AUTH_TYPE =
+      GRAVITINO_PREFIX + AuthProperties.GRAVITINO_CLIENT_AUTH_TYPE;
+  public static final String GRAVITINO_OAUTH2_URI =
+      GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_SERVER_URI;
+  public static final String GRAVITINO_OAUTH2_PATH =
+      GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_TOKEN_PATH;
+  public static final String GRAVITINO_OAUTH2_CREDENTIAL =
+      GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_CREDENTIAL;
+  public static final String GRAVITINO_OAUTH2_SCOPE =
+      GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_SCOPE;
+  public static final String GRAVITINO_KERBEROS_PRINCIPAL = 
"spark.kerberos.principal";
+  public static final String GRAVITINO_KERBEROS_KEYTAB_FILE_PATH = 
"spark.kerberos.keytab";
+
   public static final String GRAVITINO_HIVE_METASTORE_URI = "metastore.uris";
   public static final String SPARK_HIVE_METASTORE_URI = "hive.metastore.uris";
 
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
index cd3bd9cc2..f04193f33 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
@@ -19,14 +19,14 @@
 package org.apache.gravitino.spark.connector.catalog;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import org.apache.gravitino.Catalog;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.client.GravitinoClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,22 +37,18 @@ public class GravitinoCatalogManager {
 
   private volatile boolean isClosed = false;
   private final Cache<String, Catalog> gravitinoCatalogs;
-  private final String metalakeName;
-  private final GravitinoMetalake metalake;
-  private final GravitinoAdminClient gravitinoClient;
+  private final GravitinoClient gravitinoClient;
 
-  private GravitinoCatalogManager(String gravitinoUri, String metalakeName) {
-    this.metalakeName = metalakeName;
-    this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build();
+  private GravitinoCatalogManager(Supplier<GravitinoClient> clientBuilder) {
+    this.gravitinoClient = clientBuilder.get();
     // Will not evict catalog by default
     this.gravitinoCatalogs = CacheBuilder.newBuilder().build();
-    this.metalake = gravitinoClient.loadMetalake(metalakeName);
   }
 
-  public static GravitinoCatalogManager create(String gravitinoUrl, String 
metalakeName) {
+  public static GravitinoCatalogManager create(Supplier<GravitinoClient> 
clientBuilder) {
     Preconditions.checkState(
         gravitinoCatalogManager == null, "Should not create duplicate 
GravitinoCatalogManager");
-    gravitinoCatalogManager = new GravitinoCatalogManager(gravitinoUrl, 
metalakeName);
+    gravitinoCatalogManager = new GravitinoCatalogManager(clientBuilder);
     return gravitinoCatalogManager;
   }
 
@@ -80,12 +76,8 @@ public class GravitinoCatalogManager {
     }
   }
 
-  public String getMetalakeName() {
-    return metalakeName;
-  }
-
   public void loadRelationalCatalogs() {
-    Catalog[] catalogs = metalake.listCatalogsInfo();
+    Catalog[] catalogs = gravitinoClient.listCatalogsInfo();
     Arrays.stream(catalogs)
         .filter(catalog -> Catalog.Type.RELATIONAL.equals(catalog.type()))
         .forEach(catalog -> gravitinoCatalogs.put(catalog.name(), catalog));
@@ -96,7 +88,7 @@ public class GravitinoCatalogManager {
   }
 
   private Catalog loadCatalog(String catalogName) {
-    Catalog catalog = metalake.loadCatalog(catalogName);
+    Catalog catalog = gravitinoClient.loadCatalog(catalogName);
     Preconditions.checkArgument(
         Catalog.Type.RELATIONAL.equals(catalog.type()), "Only support 
relational catalog");
     LOG.info("Load catalog {} from Gravitino successfully.", catalogName);
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
index 335e85a38..d46ef3d9f 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
@@ -24,18 +24,26 @@ import static 
org.apache.gravitino.spark.connector.utils.ConnectorUtil.removeDup
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.auth.AuthProperties;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.client.GravitinoClient.ClientBuilder;
+import org.apache.gravitino.client.KerberosTokenProvider;
 import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
 import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager;
 import 
org.apache.gravitino.spark.connector.iceberg.extensions.GravitinoIcebergSparkSessionExtensions;
 import org.apache.gravitino.spark.connector.version.CatalogNameAdaptor;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.api.plugin.DriverPlugin;
@@ -83,7 +91,9 @@ public class GravitinoDriverPlugin implements DriverPlugin {
       gravitinoDriverExtensions.addAll(gravitinoIcebergExtensions);
     }
 
-    this.catalogManager = GravitinoCatalogManager.create(gravitinoUri, 
metalake);
+    this.catalogManager =
+        GravitinoCatalogManager.create(
+            () -> createGravitinoClient(gravitinoUri, metalake, conf, 
sc.sparkUser()));
     catalogManager.loadRelationalCatalogs();
     registerGravitinoCatalogs(conf, catalogManager.getCatalogs());
     registerSqlExtensions(conf);
@@ -155,4 +165,57 @@ public class GravitinoDriverPlugin implements DriverPlugin 
{
       conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(), 
extensionString);
     }
   }
+
+  private static GravitinoClient createGravitinoClient(
+      String uri, String metalake, SparkConf sparkConf, String sparkUser) {
+    ClientBuilder builder = 
GravitinoClient.builder(uri).withMetalake(metalake);
+    String authType =
+        sparkConf.get(GravitinoSparkConfig.GRAVITINO_AUTH_TYPE, 
AuthProperties.SIMPLE_AUTH_TYPE);
+    if (AuthProperties.isSimple(authType)) {
+      Preconditions.checkArgument(
+          !UserGroupInformation.isSecurityEnabled(),
+          "Spark simple auth mode doesn't support setting kerberos 
configurations");
+      builder.withSimpleAuth(sparkUser);
+    } else if (AuthProperties.isOAuth2(authType)) {
+      String oAuthUri = getRequiredConfig(sparkConf, 
GravitinoSparkConfig.GRAVITINO_OAUTH2_URI);
+      String credential =
+          getRequiredConfig(sparkConf, 
GravitinoSparkConfig.GRAVITINO_OAUTH2_CREDENTIAL);
+      String path = getRequiredConfig(sparkConf, 
GravitinoSparkConfig.GRAVITINO_OAUTH2_PATH);
+      String scope = getRequiredConfig(sparkConf, 
GravitinoSparkConfig.GRAVITINO_OAUTH2_SCOPE);
+      DefaultOAuth2TokenProvider oAuth2TokenProvider =
+          DefaultOAuth2TokenProvider.builder()
+              .withUri(oAuthUri)
+              .withCredential(credential)
+              .withPath(path)
+              .withScope(scope)
+              .build();
+      builder.withOAuth(oAuth2TokenProvider);
+    } else if (AuthProperties.isKerberos(authType)) {
+      String principal =
+          getRequiredConfig(sparkConf, 
GravitinoSparkConfig.GRAVITINO_KERBEROS_PRINCIPAL);
+      String keyTabFile =
+          getRequiredConfig(sparkConf, 
GravitinoSparkConfig.GRAVITINO_KERBEROS_KEYTAB_FILE_PATH);
+      KerberosTokenProvider kerberosTokenProvider =
+          KerberosTokenProvider.builder()
+              .withClientPrincipal(principal)
+              .withKeyTabFile(new File(keyTabFile))
+              .build();
+      builder.withKerberosAuth(kerberosTokenProvider);
+    } else {
+      throw new UnsupportedOperationException("Unsupported auth type: " + 
authType);
+    }
+    return builder.build();
+  }
+
+  private static String getRequiredConfig(SparkConf sparkConf, String 
configKey) {
+    String configValue = sparkConf.get(configKey, null);
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(configValue), configKey + " should not be 
empty");
+    return configValue;
+  }
+
+  @Nullable
+  private static String getOptionalConfig(SparkConf sparkConf, String 
configKey) {
+    return sparkConf.get(configKey, null);
+  }
 }

Reply via email to