ebyhr commented on code in PR #16643:
URL: https://github.com/apache/iceberg/pull/16643#discussion_r3339422317
##########
spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMigrateTableProcedure.java:
##########
@@ -305,4 +310,43 @@ public void testMigrateBucketedTable() throws IOException {
"Cannot create an Iceberg table from a bucketed source table: "
+ "4 buckets, bucket columns: [id]");
}
+
+ @TestTemplate
+ public void testMigrateIgnoreMissingFiles() throws IOException {
+ assumeThat(catalogName).isEqualToIgnoringCase("spark_catalog");
+ String location = Files.createTempDirectory(temp,
"junit").toFile().toString();
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, data string) USING parquet
PARTITIONED BY (id) LOCATION '%s'",
+ tableName, location);
+ sql("INSERT INTO TABLE %s (id, data) VALUES (1, 'a'), (2, 'b')",
tableName);
+
+ // Remove one partition's directory while leaving the catalog entry intact
to simulate a
+ // concurrent deletion racing with the migration.
+ deleteDirectory(Paths.get(location, "id=1"));
+
+ Object result =
+ scalarSql(
+ "CALL %s.system.migrate(table => '%s', ignore_missing_files =>
true)",
+ catalogName, tableName);
+ assertThat(result).as("Should have imported only the surviving
partition").isEqualTo(1L);
+
+ assertEquals(
+ "Migrated table should only contain rows from the surviving partition",
+ ImmutableList.of(row("b", 2L)),
+ sql("SELECT * FROM %s", tableName));
+ }
+
+ private static void deleteDirectory(Path dir) throws IOException {
Review Comment:
We could use `FileUtils.deleteDirectory` method instead.
##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -72,6 +72,18 @@ default MigrateTable executeWith(ExecutorService service) {
throw new UnsupportedOperationException("Setting executor service is not
supported");
}
+ /**
+ * Sets whether to ignore {@link java.io.FileNotFoundException} when listing
source data files.
+ * When set to {@code true}, partitions whose files have been deleted
concurrently are skipped
+ * with a warning instead of failing the migration. The default is {@code
false}.
+ *
+ * @param ignore whether to ignore missing source files
+ * @return this for method chaining
+ */
+ default MigrateTable ignoreMissingFiles(boolean ignore) {
Review Comment:
I'm wondering if we really need `boolean ignore` argument here. We could
remove it, and modify MigrateTableProcedure to something like:
```java
boolean ignoreMissingFiles =
input.asBoolean(IGNORE_MISSING_FILES_PARAM, false);
if (ignoreMissingFiles) {
migrateTableSparkAction =
migrateTableSparkAction.ignoreMissingFiles();
}
```
The existing `drop_backup` parameter employs this style.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]