andersonad opened a new issue #3829:
URL: https://github.com/apache/iceberg/issues/3829
When I ran the following code to test the `expire_snapshots` procedure in
pyspark, I get a "failed to get file system" for an s3 file, despite using the
S3FileIO for the `spark.sql.catalog.{catalog_name}.io-impl`. Is there a way to
get around this error in order to execute `expire_snapshots` on AWS s3 iceberg
tables?
Note that I do not get this error when using `rewrite_manifests`
```
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,
IntegerType
import time
spark_packages = [
'org.apache.iceberg:iceberg-spark3-runtime:0.12.1',
'software.amazon.awssdk:bundle:2.16.43',
'software.amazon.awssdk:url-connection-client:2.16.43',
]
catalog_name = 'iceberg_dev'
spark = SparkSession \
.builder \
.config('spark.jars.packages', ','.join(spark_packages)) \
.config('spark.sql.extensions',
'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.config(f'spark.sql.catalog.{catalog_name}',
'org.apache.iceberg.spark.SparkCatalog') \
.config(f'spark.sql.catalog.{catalog_name}.warehouse',
f's3://your_bucket_here/{catalog_name}') \
.config(f'spark.sql.catalog.{catalog_name}.catalog-impl',
'org.apache.iceberg.aws.glue.GlueCatalog') \
.config(f'spark.sql.catalog.{catalog_name}.io-impl',
'org.apache.iceberg.aws.s3.S3FileIO') \
.config(f'spark.sql.catalog.{catalog_name}.lock-impl',
'org.apache.iceberg.aws.glue.DynamoLockManager') \
.config(f'spark.sql.catalog.{catalog_name}.lock.table',
f'{catalog_name}') \
.getOrCreate()
subName = 'product.testtable'
tableName = f"{catalog_name}.{subName}"
spark.sql(f"DROP TABLE IF EXISTS {tableName}")
spark.sql(f"CREATE TABLE {tableName} (id bigint NOT NULL, data string) USING
iceberg")
spark.sql(f"INSERT INTO TABLE {tableName} VALUES (1, 'a')")
time.sleep(3)
spark.sql(f"INSERT INTO TABLE {tableName} VALUES (2, 'b')")
expiretime = str(spark.sql(f"SELECT * FROM
{tableName}.snapshots").tail(1)[0]['committed_at'])
spark.sql(f"CALL {catalog_name}.system.expire_snapshots('{subName}',
TIMESTAMP '{expiretime}')").show()
```
The error is as follows:
```Py4JJavaError: An error occurred while calling o40.sql.
: org.apache.iceberg.exceptions.RuntimeIOException: Failed to get file
system for path:
s3://iceberg_dev/product.db/testtable/metadata/00002-3942827a-4fa3-4b6b-8993-353f34bae6f5.metadata.json
at org.apache.iceberg.hadoop.Util.getFs(Util.java:50)
at
org.apache.iceberg.hadoop.HadoopInputFile.fromPath(HadoopInputFile.java:75)
at
org.apache.iceberg.hadoop.HadoopInputFile.fromLocation(HadoopInputFile.java:54)
at
org.apache.iceberg.hadoop.HadoopFileIO.newInputFile(HadoopFileIO.java:59)
at
org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:233)
at
org.apache.iceberg.StaticTableOperations.<init>(StaticTableOperations.java:40)
at
org.apache.iceberg.hadoop.HadoopTables.newTableOps(HadoopTables.java:195)
at
org.apache.iceberg.hadoop.HadoopTables.loadMetadataTable(HadoopTables.java:121)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:82)
at org.apache.iceberg.spark.SparkCatalog.load(SparkCatalog.java:451)
at
org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:116)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:79)
at
org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:271)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:248)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
at
org.apache.iceberg.actions.BaseSparkAction.loadMetadataTable(BaseSparkAction.java:148)
at
org.apache.iceberg.actions.BaseSparkAction.buildValidDataFileDF(BaseSparkAction.java:95)
at
org.apache.iceberg.actions.ExpireSnapshotsAction.buildValidFileDF(ExpireSnapshotsAction.java:226)
at
org.apache.iceberg.actions.ExpireSnapshotsAction.expire(ExpireSnapshotsAction.java:185)
at
org.apache.iceberg.actions.ExpireSnapshotsAction.execute(ExpireSnapshotsAction.java:217)
at
org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.lambda$call$0(ExpireSnapshotsProcedure.java:97)
at
org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:75)
at
org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:64)
at
org.apache.iceberg.spark.procedures.ExpireSnapshotsProcedure.call(ExpireSnapshotsProcedure.java:84)
at
org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:33)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
at
org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
at
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:610)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:605)
at jdk.internal.reflect.GeneratedMethodAccessor32.invoke(Unknown Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at org.apache.iceberg.hadoop.Util.getFs(Util.java:48)
... 53 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]