This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new a60b1995d [#5055] support spark authentication with Gravitino (#5040)
a60b1995d is described below
commit a60b1995d5644be589633a8eabea047017840b3a
Author: FANNG <[email protected]>
AuthorDate: Mon Oct 21 15:57:01 2024 +0800
[#5055] support spark authentication with Gravitino (#5040)
### What changes were proposed in this pull request?
support spark authentication with Gravitino,
- simple
- oauth2
- kerberos
### Why are the changes needed?
Fix: #5055
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
I setup related environment in local machine, test create table with
coresponding user.
---
.../org/apache/gravitino/auth/AuthProperties.java | 57 +++++++++++++++++++
.../spark-authentication-with-gravitino.md | 39 +++++++++++++
.../spark/connector/GravitinoSparkConfig.java | 16 ++++++
.../connector/catalog/GravitinoCatalogManager.java | 26 +++------
.../connector/plugin/GravitinoDriverPlugin.java | 65 +++++++++++++++++++++-
5 files changed, 185 insertions(+), 18 deletions(-)
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/auth/AuthProperties.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/auth/AuthProperties.java
new file mode 100644
index 000000000..2984ee2ba
--- /dev/null
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/auth/AuthProperties.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.auth;
+
+public class AuthProperties {
+
+ /** The configuration key for the Gravitino client auth type. */
+ public static final String GRAVITINO_CLIENT_AUTH_TYPE = "authType";
+
+ public static final String SIMPLE_AUTH_TYPE = "simple";
+ public static final String OAUTH2_AUTH_TYPE = "oauth2";
+ public static final String KERBEROS_AUTH_TYPE = "kerberos";
+
+ // oauth2
+ /** The configuration key for the URI of the default OAuth server. */
+ public static final String GRAVITINO_OAUTH2_SERVER_URI = "oauth2.serverUri";
+
+ /** The configuration key for the client credential. */
+ public static final String GRAVITINO_OAUTH2_CREDENTIAL = "oauth2.credential";
+
+ /** The configuration key for the path which to get the token. */
+ public static final String GRAVITINO_OAUTH2_TOKEN_PATH = "oauth2.tokenPath";
+
+ /** The configuration key for the scope of the token. */
+ public static final String GRAVITINO_OAUTH2_SCOPE = "oauth2.scope";
+
+ public static boolean isKerberos(String authType) {
+ return KERBEROS_AUTH_TYPE.equalsIgnoreCase(authType);
+ }
+
+ public static boolean isOAuth2(String authType) {
+ return OAUTH2_AUTH_TYPE.equalsIgnoreCase(authType);
+ }
+
+ public static boolean isSimple(String authType) {
+ return authType == null || SIMPLE_AUTH_TYPE.equalsIgnoreCase(authType);
+ }
+
+ private AuthProperties() {}
+}
diff --git a/docs/spark-connector/spark-authentication-with-gravitino.md
b/docs/spark-connector/spark-authentication-with-gravitino.md
new file mode 100644
index 000000000..9ec4a949a
--- /dev/null
+++ b/docs/spark-connector/spark-authentication-with-gravitino.md
@@ -0,0 +1,39 @@
+---
+title: "Spark authentication with Gravitino server"
+slug: /spark-connector/spark-authentication
+keyword: spark connector authentication oauth2 kerberos
+license: "This software is licensed under the Apache License version 2."
+---
+
+## Overview
+
+Spark connector supports `simple` `oauth2` and `kerberos` authentication when
accessing Gravitino server.
+
+| Property | Type | Default Value | Description
| Required | Since Version |
+|------------------------------|--------|---------------|---------------------------------------------------------------------------------------------------------------------|----------|------------------|
+| spark.sql.gravitino.authType | string | `simple` | The authentication
mechanisms when communicating with Gravitino server, supports `simple`,
`oauth2` and `kerberos`. | No | 0.7.0-incubating |
+
+## Simple mode
+
+In the simple mode, the username originates from Spark, and is obtained using
the following sequences:
+1. The environment variable of `SPARK_USER`
+2. The environment variable of `HADOOP_USER_NAME`
+3. The user login in the machine
+
+## OAuth2 mode
+
+In the OAuth2 mode, you could use the following configuration to fetch an
OAuth2 token to access Gravitino server.
+
+| Property | Type | Default Value | Description
| Required | Since Version |
+|---------------------------------------|--------|---------------|-----------------------------------------------|----------------------|------------------|
+| spark.sql.gravitino.oauth2.serverUri | string | None | The OAuth2
server uri address. | Yes, for OAuth2 mode | 0.7.0-incubating |
+| spark.sql.gravitino.oauth2.tokenPath | string | None | The path of
token interface in OAuth2 server. | Yes, for OAuth2 mode | 0.7.0-incubating |
+| spark.sql.gravitino.oauth2.credential | string | None | The
credential to request the OAuth2 token. | Yes, for OAuth2 mode |
0.7.0-incubating |
+| spark.sql.gravitino.oauth2.scope | string | None | The scope
to request the OAuth2 token. | Yes, for OAuth2 mode | 0.7.0-incubating |
+
+## Kerberos mode
+
+In kerberos mode, you could use the Spark kerberos configuration to fetch a
kerberos ticket to access Gravitino server, use `spark.kerberos.principal`,
`spark.kerberos.keytab` to specify kerberos principal and keytab.
+
+The principal of Gravitino server is like `HTTP/$host@$realm`, please keep the
`$host` consistent with the host in Gravitino server uri address.
+Please make sure `krb5.conf` is accessible by Spark, like by specifying the
configuration
`spark.driver.extraJavaOptions="-Djava.security.krb5.conf=/xx/krb5.conf"`.
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
index 160e6f6ec..630ae5c33 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/GravitinoSparkConfig.java
@@ -19,6 +19,8 @@
package org.apache.gravitino.spark.connector;
+import org.apache.gravitino.auth.AuthProperties;
+
public class GravitinoSparkConfig {
private static final String GRAVITINO_PREFIX = "spark.sql.gravitino.";
@@ -26,6 +28,20 @@ public class GravitinoSparkConfig {
public static final String GRAVITINO_METALAKE = GRAVITINO_PREFIX +
"metalake";
public static final String GRAVITINO_ENABLE_ICEBERG_SUPPORT =
GRAVITINO_PREFIX + "enableIcebergSupport";
+
+ public static final String GRAVITINO_AUTH_TYPE =
+ GRAVITINO_PREFIX + AuthProperties.GRAVITINO_CLIENT_AUTH_TYPE;
+ public static final String GRAVITINO_OAUTH2_URI =
+ GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_SERVER_URI;
+ public static final String GRAVITINO_OAUTH2_PATH =
+ GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_TOKEN_PATH;
+ public static final String GRAVITINO_OAUTH2_CREDENTIAL =
+ GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_CREDENTIAL;
+ public static final String GRAVITINO_OAUTH2_SCOPE =
+ GRAVITINO_PREFIX + AuthProperties.GRAVITINO_OAUTH2_SCOPE;
+ public static final String GRAVITINO_KERBEROS_PRINCIPAL =
"spark.kerberos.principal";
+ public static final String GRAVITINO_KERBEROS_KEYTAB_FILE_PATH =
"spark.kerberos.keytab";
+
public static final String GRAVITINO_HIVE_METASTORE_URI = "metastore.uris";
public static final String SPARK_HIVE_METASTORE_URI = "hive.metastore.uris";
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
index cd3bd9cc2..f04193f33 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/catalog/GravitinoCatalogManager.java
@@ -19,14 +19,14 @@
package org.apache.gravitino.spark.connector.catalog;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.gravitino.Catalog;
-import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.client.GravitinoClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,22 +37,18 @@ public class GravitinoCatalogManager {
private volatile boolean isClosed = false;
private final Cache<String, Catalog> gravitinoCatalogs;
- private final String metalakeName;
- private final GravitinoMetalake metalake;
- private final GravitinoAdminClient gravitinoClient;
+ private final GravitinoClient gravitinoClient;
- private GravitinoCatalogManager(String gravitinoUri, String metalakeName) {
- this.metalakeName = metalakeName;
- this.gravitinoClient = GravitinoAdminClient.builder(gravitinoUri).build();
+ private GravitinoCatalogManager(Supplier<GravitinoClient> clientBuilder) {
+ this.gravitinoClient = clientBuilder.get();
// Will not evict catalog by default
this.gravitinoCatalogs = CacheBuilder.newBuilder().build();
- this.metalake = gravitinoClient.loadMetalake(metalakeName);
}
- public static GravitinoCatalogManager create(String gravitinoUrl, String
metalakeName) {
+ public static GravitinoCatalogManager create(Supplier<GravitinoClient>
clientBuilder) {
Preconditions.checkState(
gravitinoCatalogManager == null, "Should not create duplicate
GravitinoCatalogManager");
- gravitinoCatalogManager = new GravitinoCatalogManager(gravitinoUrl,
metalakeName);
+ gravitinoCatalogManager = new GravitinoCatalogManager(clientBuilder);
return gravitinoCatalogManager;
}
@@ -80,12 +76,8 @@ public class GravitinoCatalogManager {
}
}
- public String getMetalakeName() {
- return metalakeName;
- }
-
public void loadRelationalCatalogs() {
- Catalog[] catalogs = metalake.listCatalogsInfo();
+ Catalog[] catalogs = gravitinoClient.listCatalogsInfo();
Arrays.stream(catalogs)
.filter(catalog -> Catalog.Type.RELATIONAL.equals(catalog.type()))
.forEach(catalog -> gravitinoCatalogs.put(catalog.name(), catalog));
@@ -96,7 +88,7 @@ public class GravitinoCatalogManager {
}
private Catalog loadCatalog(String catalogName) {
- Catalog catalog = metalake.loadCatalog(catalogName);
+ Catalog catalog = gravitinoClient.loadCatalog(catalogName);
Preconditions.checkArgument(
Catalog.Type.RELATIONAL.equals(catalog.type()), "Only support
relational catalog");
LOG.info("Load catalog {} from Gravitino successfully.", catalogName);
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
index 335e85a38..d46ef3d9f 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/plugin/GravitinoDriverPlugin.java
@@ -24,18 +24,26 @@ import static
org.apache.gravitino.spark.connector.utils.ConnectorUtil.removeDup
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.auth.AuthProperties;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.client.GravitinoClient.ClientBuilder;
+import org.apache.gravitino.client.KerberosTokenProvider;
import org.apache.gravitino.spark.connector.GravitinoSparkConfig;
import org.apache.gravitino.spark.connector.catalog.GravitinoCatalogManager;
import
org.apache.gravitino.spark.connector.iceberg.extensions.GravitinoIcebergSparkSessionExtensions;
import org.apache.gravitino.spark.connector.version.CatalogNameAdaptor;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.plugin.DriverPlugin;
@@ -83,7 +91,9 @@ public class GravitinoDriverPlugin implements DriverPlugin {
gravitinoDriverExtensions.addAll(gravitinoIcebergExtensions);
}
- this.catalogManager = GravitinoCatalogManager.create(gravitinoUri,
metalake);
+ this.catalogManager =
+ GravitinoCatalogManager.create(
+ () -> createGravitinoClient(gravitinoUri, metalake, conf,
sc.sparkUser()));
catalogManager.loadRelationalCatalogs();
registerGravitinoCatalogs(conf, catalogManager.getCatalogs());
registerSqlExtensions(conf);
@@ -155,4 +165,57 @@ public class GravitinoDriverPlugin implements DriverPlugin
{
conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS().key(),
extensionString);
}
}
+
+ private static GravitinoClient createGravitinoClient(
+ String uri, String metalake, SparkConf sparkConf, String sparkUser) {
+ ClientBuilder builder =
GravitinoClient.builder(uri).withMetalake(metalake);
+ String authType =
+ sparkConf.get(GravitinoSparkConfig.GRAVITINO_AUTH_TYPE,
AuthProperties.SIMPLE_AUTH_TYPE);
+ if (AuthProperties.isSimple(authType)) {
+ Preconditions.checkArgument(
+ !UserGroupInformation.isSecurityEnabled(),
+ "Spark simple auth mode doesn't support setting kerberos
configurations");
+ builder.withSimpleAuth(sparkUser);
+ } else if (AuthProperties.isOAuth2(authType)) {
+ String oAuthUri = getRequiredConfig(sparkConf,
GravitinoSparkConfig.GRAVITINO_OAUTH2_URI);
+ String credential =
+ getRequiredConfig(sparkConf,
GravitinoSparkConfig.GRAVITINO_OAUTH2_CREDENTIAL);
+ String path = getRequiredConfig(sparkConf,
GravitinoSparkConfig.GRAVITINO_OAUTH2_PATH);
+ String scope = getRequiredConfig(sparkConf,
GravitinoSparkConfig.GRAVITINO_OAUTH2_SCOPE);
+ DefaultOAuth2TokenProvider oAuth2TokenProvider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(oAuthUri)
+ .withCredential(credential)
+ .withPath(path)
+ .withScope(scope)
+ .build();
+ builder.withOAuth(oAuth2TokenProvider);
+ } else if (AuthProperties.isKerberos(authType)) {
+ String principal =
+ getRequiredConfig(sparkConf,
GravitinoSparkConfig.GRAVITINO_KERBEROS_PRINCIPAL);
+ String keyTabFile =
+ getRequiredConfig(sparkConf,
GravitinoSparkConfig.GRAVITINO_KERBEROS_KEYTAB_FILE_PATH);
+ KerberosTokenProvider kerberosTokenProvider =
+ KerberosTokenProvider.builder()
+ .withClientPrincipal(principal)
+ .withKeyTabFile(new File(keyTabFile))
+ .build();
+ builder.withKerberosAuth(kerberosTokenProvider);
+ } else {
+ throw new UnsupportedOperationException("Unsupported auth type: " +
authType);
+ }
+ return builder.build();
+ }
+
+ private static String getRequiredConfig(SparkConf sparkConf, String
configKey) {
+ String configValue = sparkConf.get(configKey, null);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(configValue), configKey + " should not be
empty");
+ return configValue;
+ }
+
+ @Nullable
+ private static String getOptionalConfig(SparkConf sparkConf, String
configKey) {
+ return sparkConf.get(configKey, null);
+ }
}