yuqi1129 opened a new issue, #5609:
URL: https://github.com/apache/gravitino/issues/5609

   ### Version
   
   main branch
   
   ### Describe what's wrong
   
   When I running spark to access GCS, the following error occur:
   
   ```
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File 
"/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 
955, in csv
       self._jwrite.csv(path)
     File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/java_gateway.py", 
line 1309, in __call__
       return_value = get_return_value(
     File "/Users/yuqi/venv/lib/python3.9/site-packages/pyspark/sql/utils.py", 
line 111, in deco
       return f(*a, **kw)
     File "/Users/yuqi/venv/lib/python3.9/site-packages/py4j/protocol.py", line 
326, in get_return_value
       raise Py4JJavaError(
   py4j.protocol.Py4JJavaError: An error occurred while calling o104.csv.
   : org.apache.gravitino.exceptions.GravitinoRuntimeException: Exception 
occurs when create new FileSystem for actual uri: 
gs://example_qazwsx/example/people, msg: 
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme 
"gs"
        at 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.lambda$getFilesetContext$2(GravitinoVirtualFileSystem.java:431)
        at 
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
        at 
java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853)
        at 
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
        at 
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
        at 
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
        at 
org.apache.gravitino.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
        at 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFilesetContext(GravitinoVirtualFileSystem.java:386)
        at 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem.getFileStatus(GravitinoVirtualFileSystem.java:547)
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1862)
        at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:117)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
        at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:839)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:750)
   
   ```
   
   ### Error message and/or stacktrace
   
   please see above.
   
   ### How to reproduce
   
   1. create a metalake named `test`, gcs catalog name `gcs_catalog`, schema 
named `schema` and fileset_named `example`.
   2. install pyspark=3.2.0 and gravitino python client.
   ```
   pip install apache-gravitino==0.7.0
   pip install pyspark==3.2.0
   ```
   2. use the following code to access GCS.
   ```
   import logging
   logging.basicConfig(level=logging.INFO)
   
   from gravitino import NameIdentifier, GravitinoClient, Catalog, Fileset, 
GravitinoAdminClient
   
   gravitino_url = "http://localhost:8090";
   metalake_name = "test"
   
   catalog_name = "gcs_catalog"
   schema_name = "schema"
   fileset_name = "example"
   
   fileset_ident = NameIdentifier.of(schema_name, fileset_name)
   
   gravitino_admin_client = GravitinoAdminClient(uri=gravitino_url)
   gravitino_client = GravitinoClient(uri=gravitino_url, 
metalake_name=metalake_name)
   from pyspark.sql import SparkSession
   import os
   
   
   os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars 
/Users/yuqi/project/gravitino/bundles/gcp-bundle/build/libs/gravitino-gcp-bundle-0.8.0-incubating-SNAPSHOT.jar,/Users/yuqi/project/gravitino/clients/filesystem-hadoop3-runtime/build/libs/gravitino-filesystem-hadoop3-runtime-0.8.0-incubating-SNAPSHOT.jar
 --master local[1] pyspark-shell"
   
   spark = SparkSession.builder \
       .appName("s3_fielset_test") \
       .config("spark.hadoop.fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs") \
       .config("spark.hadoop.fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem") \
       .config("spark.hadoop.fs.gravitino.server.uri", "http://localhost:8090";) 
\
       .config("spark.hadoop.fs.gravitino.client.metalake", "test") \
       .config("spark.hadoop.gcs-service-account-file", 
"/Users/yuqi/Downloads/silken-physics-431108-g3-30ab3d97bb60.json") \
       .config("spark.hadoop.fs.gvfs.filesystem.providers", "gcs") \
       .config("spark.driver.memory", "2g") \
       .config("spark.driver.port", "2048") \
       .config("spark.driver.extraClassPath", 
"/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar")
 \
       .config("spark.executor.extraClassPath", 
"/Users/yuqi/Downloads/hadoop-client-runtime-3.3.6.jar:/Users/yuqi/Downloads/hadoop-client-api-3.3.6.jar")
 \
       .getOrCreate()
   
   spark.sparkContext.setLogLevel("DEBUG")  
   
   data = [("Alice", 25), ("Bob", 30), ("Cathy", 45)]
   columns = ["Name", "Age"]
   spark_df = spark.createDataFrame(data, schema=columns)
   gvfs_path = 
f"gvfs://fileset/{catalog_name}/{schema_name}/{fileset_name}/people"
   
   spark_df.coalesce(1).write \
       .mode("overwrite") \
       .option("header", "true") \
       .csv(gvfs_path)
   spark.stop()   
   ``` 
   
   ### Additional context
   
   Reason: The value of `FILE_SYSTEMS_LOADED` in `FileSystem` is always true 
before `provider.getFileSystem(filePath, maps);` in 
`GravitinoVirtualFileSystem` and the GCS filesystem has not been loaded please 
see the value of `SERVICE_FILE_SYSTEMS` before and after:
   
   Before
   ```
   {viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class 
org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class 
org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class 
org.apache.hadoop.fs.HarFileSystem, http=class 
org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class 
org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class 
org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class 
org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class 
org.apache.hadoop.fs.http.HttpsFileSystem}
   ```
   
   After:
   ```
   {viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class 
org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, file=class 
org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem, har=class 
org.apache.hadoop.fs.HarFileSystem, http=class 
org.apache.hadoop.fs.http.HttpFileSystem, hdfs=class 
org.apache.hadoop.hdfs.DistributedFileSystem, webhdfs=class 
org.apache.hadoop.hdfs.web.WebHdfsFileSystem, nullscan=class 
org.apache.hadoop.hive.ql.io.NullScanFileSystem, https=class 
org.apache.hadoop.fs.http.HttpsFileSystem}
   ```
   
   We need to call `ServiceLoader` again to load all the FileSystem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to