This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 788bea2695 Spark 3.5: Fix the setting of equalAuthorities in
RemoveOrphanFilesProcedure (#10334)
788bea2695 is described below
commit 788bea2695d78f8d16e7595d41de5ae7ca96b91e
Author: dongwang <[email protected]>
AuthorDate: Thu May 16 20:05:38 2024 +0800
Spark 3.5: Fix the setting of equalAuthorities in
RemoveOrphanFilesProcedure (#10334)
---
.../extensions/TestRemoveOrphanFilesProcedure.java | 83 ++++++++++++++++++++++
.../procedures/RemoveOrphanFilesProcedure.java | 2 +-
2 files changed, 84 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index 5d48daa74f..76eef6a73b 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -666,4 +666,87 @@ public class TestRemoveOrphanFilesProcedure extends
ExtensionsTestBase {
// Dropping the table here
sql("DROP TABLE %s", tableName);
}
+
+ @TestTemplate
+ public void testRemoveOrphanFilesProcedureWithEqualAuthorities()
+ throws NoSuchTableException, ParseException, IOException {
+ if (catalogName.equals("testhadoop")) {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ } else {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
LOCATION '%s'",
+ tableName, java.nio.file.Files.createTempDirectory(temp, "junit"));
+ }
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Path originalPath = new Path(table.location());
+
+ URI uri = originalPath.toUri();
+ String originalAuthority = uri.getAuthority() == null ? "" :
uri.getAuthority();
+ Path newParentPath = new Path(uri.getScheme(), "localhost", uri.getPath());
+
+ DataFile dataFile1 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-a.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-b.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ Timestamp lastModifiedTimestamp = new Timestamp(10000);
+
+ List<FilePathLastModifiedRecord> allFiles =
+ Lists.newArrayList(
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-a.parquet").toString(),
lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-b.parquet").toString(),
lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ ReachableFileUtil.versionHintLocation(table),
lastModifiedTimestamp));
+
+ for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
+ allFiles.add(new FilePathLastModifiedRecord(file,
lastModifiedTimestamp));
+ }
+
+ for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
+ allFiles.add(new FilePathLastModifiedRecord(manifest.path(),
lastModifiedTimestamp));
+ }
+
+ Dataset<Row> compareToFileList =
+ spark
+ .createDataFrame(allFiles, FilePathLastModifiedRecord.class)
+ .withColumnRenamed("filePath", "file_path")
+ .withColumnRenamed("lastModified", "last_modified");
+ String fileListViewName = "files_view";
+ compareToFileList.createOrReplaceTempView(fileListViewName);
+ List<Object[]> orphanFiles =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "equal_authorities => map('localhost', '%s'),"
+ + "file_list_view => '%s')",
+ catalogName, tableIdent, originalAuthority, fileListViewName);
+ assertThat(orphanFiles).isEmpty();
+
+ // Test with no equal authorities
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "file_list_view => '%s')",
+ catalogName, tableIdent, fileListViewName))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageEndingWith("Conflicting authorities/schemes: [(localhost,
null)].");
+
+ // Drop table in afterEach has purge and fails due to invalid authority
"localhost"
+ // Dropping the table here
+ sql("DROP TABLE %s", tableName);
+ }
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 6e66ea2629..6609efa95e 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -128,7 +128,7 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
DataTypes.StringType,
DataTypes.StringType,
(k, v) -> {
- equalSchemes.put(k.toString(), v.toString());
+ equalAuthorities.put(k.toString(), v.toString());
return BoxedUnit.UNIT;
});
}