This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 788bea2695 Spark 3.5: Fix the setting of equalAuthorities in 
RemoveOrphanFilesProcedure (#10334)
788bea2695 is described below

commit 788bea2695d78f8d16e7595d41de5ae7ca96b91e
Author: dongwang <[email protected]>
AuthorDate: Thu May 16 20:05:38 2024 +0800

    Spark 3.5: Fix the setting of equalAuthorities in 
RemoveOrphanFilesProcedure (#10334)
---
 .../extensions/TestRemoveOrphanFilesProcedure.java | 83 ++++++++++++++++++++++
 .../procedures/RemoveOrphanFilesProcedure.java     |  2 +-
 2 files changed, 84 insertions(+), 1 deletion(-)

diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index 5d48daa74f..76eef6a73b 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -666,4 +666,87 @@ public class TestRemoveOrphanFilesProcedure extends 
ExtensionsTestBase {
     // Dropping the table here
     sql("DROP TABLE %s", tableName);
   }
+
+  @TestTemplate
+  public void testRemoveOrphanFilesProcedureWithEqualAuthorities()
+      throws NoSuchTableException, ParseException, IOException {
+    if (catalogName.equals("testhadoop")) {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    } else {
+      sql(
+          "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
LOCATION '%s'",
+          tableName, java.nio.file.Files.createTempDirectory(temp, "junit"));
+    }
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    Path originalPath = new Path(table.location());
+
+    URI uri = originalPath.toUri();
+    String originalAuthority = uri.getAuthority() == null ? "" : 
uri.getAuthority();
+    Path newParentPath = new Path(uri.getScheme(), "localhost", uri.getPath());
+
+    DataFile dataFile1 =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(new Path(newParentPath, 
"path/to/data-a.parquet").toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+    DataFile dataFile2 =
+        DataFiles.builder(PartitionSpec.unpartitioned())
+            .withPath(new Path(newParentPath, 
"path/to/data-b.parquet").toString())
+            .withFileSizeInBytes(10)
+            .withRecordCount(1)
+            .build();
+
+    table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+    Timestamp lastModifiedTimestamp = new Timestamp(10000);
+
+    List<FilePathLastModifiedRecord> allFiles =
+        Lists.newArrayList(
+            new FilePathLastModifiedRecord(
+                new Path(originalPath, "path/to/data-a.parquet").toString(), 
lastModifiedTimestamp),
+            new FilePathLastModifiedRecord(
+                new Path(originalPath, "path/to/data-b.parquet").toString(), 
lastModifiedTimestamp),
+            new FilePathLastModifiedRecord(
+                ReachableFileUtil.versionHintLocation(table), 
lastModifiedTimestamp));
+
+    for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
+      allFiles.add(new FilePathLastModifiedRecord(file, 
lastModifiedTimestamp));
+    }
+
+    for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
+      allFiles.add(new FilePathLastModifiedRecord(manifest.path(), 
lastModifiedTimestamp));
+    }
+
+    Dataset<Row> compareToFileList =
+        spark
+            .createDataFrame(allFiles, FilePathLastModifiedRecord.class)
+            .withColumnRenamed("filePath", "file_path")
+            .withColumnRenamed("lastModified", "last_modified");
+    String fileListViewName = "files_view";
+    compareToFileList.createOrReplaceTempView(fileListViewName);
+    List<Object[]> orphanFiles =
+        sql(
+            "CALL %s.system.remove_orphan_files("
+                + "table => '%s',"
+                + "equal_authorities => map('localhost', '%s'),"
+                + "file_list_view => '%s')",
+            catalogName, tableIdent, originalAuthority, fileListViewName);
+    assertThat(orphanFiles).isEmpty();
+
+    // Test with no equal authorities
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CALL %s.system.remove_orphan_files("
+                        + "table => '%s',"
+                        + "file_list_view => '%s')",
+                    catalogName, tableIdent, fileListViewName))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageEndingWith("Conflicting authorities/schemes: [(localhost, 
null)].");
+
+    // Drop table in afterEach has purge and fails due to invalid authority 
"localhost"
+    // Dropping the table here
+    sql("DROP TABLE %s", tableName);
+  }
 }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 6e66ea2629..6609efa95e 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -128,7 +128,7 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
               DataTypes.StringType,
               DataTypes.StringType,
               (k, v) -> {
-                equalSchemes.put(k.toString(), v.toString());
+                equalAuthorities.put(k.toString(), v.toString());
                 return BoxedUnit.UNIT;
               });
     }

Reply via email to