yuqi1129 opened a new issue, #5609:
URL: https://github.com/apache/gravitino/issues/5609
### Version
main branch
### Describe what's wrong
When I running spark to access GCS, the following error occur:
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File
"/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line
955, in csv
self._jwrite.csv(path)
File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/java_gateway.py",
line 1309, in __call__
return_value = get_return_value(
File "/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/utils.py",
line 111, in deco
return f(*a, **kw)
File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o104.csv.
: org.apache.gravitino.exceptions.GravitinoRuntimeException: Exception
occurs when create new FileSystem for actual uri:
gs://example_qazwsx/example/people, msg:
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme
"gs"
at
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.lambda$getFilesetContext$2(GravitinoVirtualFileSystem.java:431)
at
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
at
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
at
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
at
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
at
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFilesetContext(GravitinoVirtualFileSystem.java:386)
at
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFileStatus(GravitinoVirtualFileSystem.java:547)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1862)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:117)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:839)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
```
### Error message and/or stacktrace
please see above.
### How to reproduce
1. create a metalake named `test`, gcs catalog name `gcs_catalog`, schema
named `schema` and fileset_named `example`.
2. install pyspark=3.2.0 and gravitino python client.
```
pip install apache-gravitino==0.7.0
pip install pyspark==3.2.0
```
2. use the following code to access GCS.
```
import logging
logging.basicConfig(level=logging.INFO)
from gravitino import NameIdentifier, GravitinoClient, Catalog, Fileset,
GravitinoAdminClient
gravitino_url = "http://localhost:8090"
metalake_name = "test"
catalog_name = "gcs_catalog"
schema_name = "schema"
fileset_name = "example"
fileset_ident = NameIdentifier.of(schema_name, fileset_name)
gravitino_admin_client = GravitinoAdminClient(uri=gravitino_url)
gravitino_client = GravitinoClient(uri=gravitino_url,
metalake_name=metalake_name)
from pyspark.sql import SparkSession
import os
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars
/Users/yuqi/project/gravitino/bundles/gcp-bundle/build/libs/gravitino-gcp-bundle-0.8.0-incubating-SNAPSHOT.jar,/Users/yuqi/project/gravitino/clients/filesystem-hadoop3-runtime/build/libs/gravitino-filesystem-hadoop3-runtime-0.8.0-incubating-SNAPSHOT.jar
--master local[1] pyspark-shell"
spark = SparkSession.builder \
.appName("s3_fielset_test") \
.config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.Gvfs") \
.config("spark.hadoop.fs.gvfs.impl",
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem") \
.config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090")
\
.config("spark.hadoop.fs.gravitino.client.metalake", "test") \
.config("spark.hadoop.gcs-service-account-file",
"/Users/yuqi/Downloads/silken-physics-431108-g3-30ab3d97bb60.json") \
.config("spark.hadoop.fs.gvfs.filesystem.providers", "gcs") \
.config("spark.driver.memory", "2g") \
.config("spark.driver.port", "2048") \
.config("spark.driver.extraClassPath",
"/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar")
\
.config("spark.executor.extraClassPath",
"/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar")
\
.getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
data = [("Alice", 25), ("Bob", 30), ("Cathy", 45)]
columns = ["Name", "Age"]
spark_df = spark.createDataFrame(data, schema=columns)
gvfs_path =
f"gvfs://fileset/{catalog_name}/{schema_name}/{fileset_name}/people"
spark_df.coalesce(1).write \
.mode("overwrite") \
.option("header", "true") \
.csv(gvfs_path)
spark.stop()
```
### Additional context
Reason: The value of `FILE_SYSTEMS_LOADED` in `FileSystem` is always true
before `provider.getFileSystem(filePath, maps);` in
`GravitinoVirtualFileSystem` and the GCS filesystem has not been loaded please
see the value of `SERVICE_FILE_SYSTEMS` before and after:
Before
```
{viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class
org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class
org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class
org.apache.hadoop.fs.HarFileSystem, http=class
org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class
org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class
org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class
org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class
org.apache.hadoop.fs.http.HttpsFileSystem}
```
After:
```
{viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class
org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class
org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class
org.apache.hadoop.fs.HarFileSystem, http=class
org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class
org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class
org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class
org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class
org.apache.hadoop.fs.http.HttpsFileSystem}
```
We need to call `ServiceLoader` again to load all the FileSystem.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]