This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6abb99f0a7 Spark 3.4, 3.3: Fix the setting of equalAuthorities in
RemoveOrphanFilesProcedure (#10342)
6abb99f0a7 is described below
commit 6abb99f0a72e27165131cf73b6e0ff587a8759c5
Author: dongwang <[email protected]>
AuthorDate: Fri May 17 15:30:28 2024 +0800
Spark 3.4, 3.3: Fix the setting of equalAuthorities in
RemoveOrphanFilesProcedure (#10342)
---
.../extensions/TestRemoveOrphanFilesProcedure.java | 83 ++++++++++++++++++++++
.../procedures/RemoveOrphanFilesProcedure.java | 2 +-
.../extensions/TestRemoveOrphanFilesProcedure.java | 83 ++++++++++++++++++++++
.../procedures/RemoveOrphanFilesProcedure.java | 2 +-
4 files changed, 168 insertions(+), 2 deletions(-)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index 05eb7a6f80..670c4e5657 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -694,4 +694,87 @@ public class TestRemoveOrphanFilesProcedure extends
SparkExtensionsTestBase {
// Dropping the table here
sql("DROP TABLE %s", tableName);
}
+
+ @Test
+ public void testRemoveOrphanFilesProcedureWithEqualAuthorities()
+ throws NoSuchTableException, ParseException, IOException {
+ if (catalogName.equals("testhadoop")) {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ } else {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
LOCATION '%s'",
+ tableName, temp.newFolder().toURI().toString());
+ }
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Path originalPath = new Path(table.location());
+
+ URI uri = originalPath.toUri();
+ String originalAuthority = uri.getAuthority() == null ? "" :
uri.getAuthority();
+ Path newParentPath = new Path(uri.getScheme(), "localhost", uri.getPath());
+
+ DataFile dataFile1 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-a.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-b.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ Timestamp lastModifiedTimestamp = new Timestamp(10000);
+
+ List<FilePathLastModifiedRecord> allFiles =
+ Lists.newArrayList(
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-a.parquet").toString(),
lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-b.parquet").toString(),
lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ ReachableFileUtil.versionHintLocation(table),
lastModifiedTimestamp));
+
+ for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
+ allFiles.add(new FilePathLastModifiedRecord(file,
lastModifiedTimestamp));
+ }
+
+ for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
+ allFiles.add(new FilePathLastModifiedRecord(manifest.path(),
lastModifiedTimestamp));
+ }
+
+ Dataset<Row> compareToFileList =
+ spark
+ .createDataFrame(allFiles, FilePathLastModifiedRecord.class)
+ .withColumnRenamed("filePath", "file_path")
+ .withColumnRenamed("lastModified", "last_modified");
+ String fileListViewName = "files_view";
+ compareToFileList.createOrReplaceTempView(fileListViewName);
+ List<Object[]> orphanFiles =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "equal_authorities => map('localhost', '%s'),"
+ + "file_list_view => '%s')",
+ catalogName, tableIdent, originalAuthority, fileListViewName);
+ Assert.assertEquals(0, orphanFiles.size());
+
+ // Test with no equal authorities
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "file_list_view => '%s')",
+ catalogName, tableIdent, fileListViewName))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageEndingWith("Conflicting authorities/schemes: [(localhost,
null)].");
+
+ // Drop table in afterEach has purge and fails due to invalid authority
"localhost"
+ // Dropping the table here
+ sql("DROP TABLE %s", tableName);
+ }
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 6e66ea2629..6609efa95e 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -128,7 +128,7 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
DataTypes.StringType,
DataTypes.StringType,
(k, v) -> {
- equalSchemes.put(k.toString(), v.toString());
+ equalAuthorities.put(k.toString(), v.toString());
return BoxedUnit.UNIT;
});
}
diff --git
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index 80b515d344..56854c561f 100644
---
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -680,4 +680,87 @@ public class TestRemoveOrphanFilesProcedure extends
SparkExtensionsTestBase {
// Dropping the table here
sql("DROP TABLE %s", tableName);
}
+
+ @Test
+ public void testRemoveOrphanFilesProcedureWithEqualAuthorities()
+ throws NoSuchTableException, ParseException, IOException {
+ if (catalogName.equals("testhadoop")) {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
tableName);
+ } else {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg
LOCATION '%s'",
+ tableName, temp.newFolder().toURI().toString());
+ }
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Path originalPath = new Path(table.location());
+
+ URI uri = originalPath.toUri();
+ String originalAuthority = uri.getAuthority() == null ? "" :
uri.getAuthority();
+ Path newParentPath = new Path(uri.getScheme(), "localhost", uri.getPath());
+
+ DataFile dataFile1 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-a.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ DataFile dataFile2 =
+ DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath(new Path(newParentPath,
"path/to/data-b.parquet").toString())
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+
+ table.newFastAppend().appendFile(dataFile1).appendFile(dataFile2).commit();
+
+ Timestamp lastModifiedTimestamp = new Timestamp(10000);
+
+ List<FilePathLastModifiedRecord> allFiles =
+ Lists.newArrayList(
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-a.parquet").toString(),
lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ new Path(originalPath, "path/to/data-b.parquet").toString(),
lastModifiedTimestamp),
+ new FilePathLastModifiedRecord(
+ ReachableFileUtil.versionHintLocation(table),
lastModifiedTimestamp));
+
+ for (String file : ReachableFileUtil.metadataFileLocations(table, true)) {
+ allFiles.add(new FilePathLastModifiedRecord(file,
lastModifiedTimestamp));
+ }
+
+ for (ManifestFile manifest : TestHelpers.dataManifests(table)) {
+ allFiles.add(new FilePathLastModifiedRecord(manifest.path(),
lastModifiedTimestamp));
+ }
+
+ Dataset<Row> compareToFileList =
+ spark
+ .createDataFrame(allFiles, FilePathLastModifiedRecord.class)
+ .withColumnRenamed("filePath", "file_path")
+ .withColumnRenamed("lastModified", "last_modified");
+ String fileListViewName = "files_view";
+ compareToFileList.createOrReplaceTempView(fileListViewName);
+ List<Object[]> orphanFiles =
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "equal_authorities => map('localhost', '%s'),"
+ + "file_list_view => '%s')",
+ catalogName, tableIdent, originalAuthority, fileListViewName);
+ Assert.assertEquals(0, orphanFiles.size());
+
+ // Test with no equal authorities
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CALL %s.system.remove_orphan_files("
+ + "table => '%s',"
+ + "file_list_view => '%s')",
+ catalogName, tableIdent, fileListViewName))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageEndingWith("Conflicting authorities/schemes: [(localhost,
null)].");
+
+ // Drop table in afterEach has purge and fails due to invalid authority
"localhost"
+ // Dropping the table here
+ sql("DROP TABLE %s", tableName);
+ }
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 6e66ea2629..6609efa95e 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -128,7 +128,7 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
DataTypes.StringType,
DataTypes.StringType,
(k, v) -> {
- equalSchemes.put(k.toString(), v.toString());
+ equalAuthorities.put(k.toString(), v.toString());
return BoxedUnit.UNIT;
});
}