andersonad opened a new issue #3829:
URL: https://github.com/apache/iceberg/issues/3829


   When I ran the following code to test the `expire_snapshots` procedure in 
pyspark, I get a "failed to get file system" for an s3 file, despite using the 
S3FileIO for the `spark.sql.catalog.{catalog_name}.io-impl`. Is there a way to 
get around this error in order to execute `expire_snapshots` on AWS s3 iceberg 
tables?
   
   Note that I do not get this error when using `rewrite_manifests`
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.types import StructType, StructField, StringType, 
IntegerType
   import time
   
   spark_packages = [
       'org.apache.iceberg:iceberg-spark3-runtime:0.12.1',
       'software.amazon.awssdk:bundle:2.16.43',
       'software.amazon.awssdk:url-connection-client:2.16.43',
   ]
   
   catalog_name = 'iceberg_dev'
   
   spark = SparkSession \
       .builder \
       .config('spark.jars.packages', ','.join(spark_packages)) \
       .config('spark.sql.extensions', 
'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
       .config(f'spark.sql.catalog.{catalog_name}', 
'org.apache.iceberg.spark.SparkCatalog') \
       .config(f'spark.sql.catalog.{catalog_name}.warehouse', 
f's3://your_bucket_here/{catalog_name}') \
       .config(f'spark.sql.catalog.{catalog_name}.catalog-impl', 
'org.apache.iceberg.aws.glue.GlueCatalog') \
       .config(f'spark.sql.catalog.{catalog_name}.io-impl', 
'org.apache.iceberg.aws.s3.S3FileIO') \
       .config(f'spark.sql.catalog.{catalog_name}.lock-impl', 
'org.apache.iceberg.aws.glue.DynamoLockManager') \
       .config(f'spark.sql.catalog.{catalog_name}.lock.table', 
f'{catalog_name}') \
       .getOrCreate()
   
   subName = 'product.testtable'
   tableName = f"{catalog_name}.{subName}"
   spark.sql(f"DROP TABLE IF EXISTS {tableName}")
   spark.sql(f"CREATE TABLE {tableName} (id bigint NOT NULL, data string) USING 
iceberg")
   spark.sql(f"INSERT INTO TABLE {tableName} VALUES (1, 'a')")
   time.sleep(3)
   spark.sql(f"INSERT INTO TABLE {tableName} VALUES (2, 'b')")
   expiretime = str(spark.sql(f"SELECT * FROM 
{tableName}.snapshots").tail(1)[0]['committed_at'])
   spark.sql(f"CALL {catalog_name}.system.expire_snapshots('{subName}', 
TIMESTAMP '{expiretime}')").show()
   ```
   
   The error is as follows:
   ```Py4JJavaError: An error occurred while calling o40.sql.
   : org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file 
system for path: 
s3://iceberg_dev/product.db/testtable/metadata/00002-3942827a-4fa3-4b6b-8993-353f34bae6f5.metadata.json
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:50)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.fromPath(HadoopInputFile.java:75)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:54)
        at 
org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:59)
        at 
org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:233)
        at 
org.apache.iceberg.StaticTableOperations.<init>(StaticTableOperations.java:40)
        at 
org.apache.iceberg.hadoop.HadoopTables.newTableOps(HadoopTables.java:195)
        at 
org.apache.iceberg.hadoop.HadoopTables.loadMetadataTable(HadoopTables.java:121)
        at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:82)
        at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:451)
        at 
org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:116)
        at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:79)
        at 
org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:271)
        at scala.Option.map(Option.scala:230)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:248)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
        at 
org.apache.iceberg.actions.BaseSparkAction.loadMetadataTable(BaseSparkAction.java:148)
        at 
org.apache.iceberg.actions.BaseSparkAction.buildValidDataFileDF(BaseSparkAction.java:95)
        at 
org.apache.iceberg.actions.ExpireSnapshotsAction.buildValidFileDF(ExpireSnapshotsAction.java:226)
        at 
org.apache.iceberg.actions.ExpireSnapshotsAction.expire(ExpireSnapshotsAction.java:185)
        at 
org.apache.iceberg.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:217)
        at 
org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.lambda$call$0(ExpireSnapshotsProcedure.java:97)
        at 
org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:75)
        at 
org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:64)
        at 
org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.call(ExpireSnapshotsProcedure.java:84)
        at 
org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:33)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
        at 
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
        at 
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:610)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:605)
        at jdk.internal.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No 
FileSystem for scheme "s3"
        at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
        at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
        at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
        at org.apache.iceberg.hadoop.Util.getFs(Util.java:48)
        ... 53 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to