RussellSpitzer commented on a change in pull request #3872:
URL: https://github.com/apache/iceberg/pull/3872#discussion_r784113791



##########
File path: 
spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
##########
@@ -232,4 +232,75 @@ public void testInvalidRemoveOrphanFilesCases() {
         IllegalArgumentException.class, "Cannot handle an empty identifier",
         () -> sql("CALL %s.system.remove_orphan_files('')", catalogName));
   }
+
+  @Test
+  public void testConcurrentRemoveOrphanFiles() throws IOException {
+    if (catalogName.equals("testhadoop")) {
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    } else {
+      // give a fresh location to Hive tables as Spark will not clean up the 
table location
+      // correctly while dropping tables through spark_catalog
+      sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg 
LOCATION '%s'",
+          tableName, temp.newFolder());
+    }
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    String metadataLocation = table.location() + "/metadata";
+    String dataLocation = table.location() + "/data";
+
+    // produce orphan files in the data location using parquet
+    sql("CREATE TABLE p (id bigint) USING parquet LOCATION '%s'", 
dataLocation);
+    sql("INSERT INTO TABLE p VALUES (1)");
+    sql("INSERT INTO TABLE p VALUES (10)");
+    sql("INSERT INTO TABLE p VALUES (100)");
+    sql("INSERT INTO TABLE p VALUES (1000)");
+
+    // wait to ensure files are old enough
+    waitUntilAfter(System.currentTimeMillis());
+
+    Timestamp currentTimestamp = 
Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+    // check for orphans in the table location
+    List<Object[]> output = sql(
+        "CALL %s.system.remove_orphan_files(" +
+            "table => '%s'," +
+            "max_concurrent_deletes => %s," +
+            "older_than => TIMESTAMP '%s')",
+        catalogName, tableIdent, 4, currentTimestamp);
+    Assert.assertEquals("Should be orphan files in the data folder", 4, 
output.size());
+
+    // the previous call should have deleted all orphan files
+    List<Object[]> output3 = sql(
+        "CALL %s.system.remove_orphan_files(" +
+            "table => '%s'," +
+            "max_concurrent_deletes => %s," +
+            "older_than => TIMESTAMP '%s')",
+        catalogName, tableIdent, 4, currentTimestamp);
+    Assert.assertEquals("Should be no more orphan files in the data folder", 
0, output3.size());
+
+    assertEquals(
+        "Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(2L, "b")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
+  @Test
+  public void testConcurrentRemoveOrphanFilesWithInvalidInput() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);

Review comment:
       I see we did it this way in ExpireSnapshots too... I guess it's fine to 
do it that way here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to