nastra commented on code in PR #4325:
URL: https://github.com/apache/iceberg/pull/4325#discussion_r1119993510
##########
docs/spark-procedures.md:
##########
@@ -425,11 +425,12 @@ By default, the original table is retained with the name
`table_BACKUP_`.
#### Usage
-| Argument Name | Required? | Type | Description |
-|---------------|-----------|------|-------------|
-| `table` | ✔️ | string | Name of the table to migrate |
-| `properties` | ️ | map<string, string> | Properties for the new Iceberg
table |
-| `drop_backup` | | boolean | When true, the original table will not be
retained as backup (defaults to false) |
+| Argument Name | Required? | Type | Description |
+|-----------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to migrate |
+| `properties` | ️ | map<string, string> | Properties for the new
Iceberg table |
+| `drop_backup` | | boolean | When true, the original table will
not be retained as backup (defaults to false) |
Review Comment:
nit: seems the `|` aren't aligned in the file itself (probably that doesn't
make any difference to the final document when you look at it in the browser)
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java:
##########
@@ -406,6 +408,76 @@ public void testMigrateUnpartitioned() throws Exception {
assertMigratedFileCount(SparkActions.get().migrateTable(source), source,
dest);
}
+ @Test
+ public void testMigrateSkipOnError() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue(
+ "Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_migrate_skip_on_error_table");
+ String dest = source;
+
+ File location = temp.newFolder();
+ spark.sql(String.format(CREATE_PARQUET, source, location));
+ CatalogTable table = loadSessionTable(source);
+ Seq<String> partitionColumns = table.partitionColumnNames();
+ String format = table.provider().get();
+
+ spark
+ .table(baseTableName)
+ .write()
+ .mode(SaveMode.Append)
+ .format(format)
+ .partitionBy(partitionColumns.toSeq())
+ .saveAsTable(source);
+
+ spark
+ .table(baseTableName)
+ .write()
+ .mode(SaveMode.Append)
+ .format(format)
+ .partitionBy(partitionColumns.toSeq())
+ .saveAsTable(source);
+
+ List<File> expectedFiles =
expectedFiles(source).collect(Collectors.toList());
+
+ Assert.assertEquals("Expected number of source files", 2,
expectedFiles.size());
+
+ // Corrupt the second file
+ File file = expectedFiles.get(1);
+ Assume.assumeTrue("Delete source file!", file.delete());
+ Assume.assumeTrue("Create a empty source file!", file.createNewFile());
Review Comment:
I don't think we should be using `assumeTrue` here as that would silently
skip the test. I'd suggest to rather add an assertion here
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java:
##########
@@ -406,6 +408,76 @@ public void testMigrateUnpartitioned() throws Exception {
assertMigratedFileCount(SparkActions.get().migrateTable(source), source,
dest);
}
+ @Test
+ public void testMigrateSkipOnError() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue(
+ "Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_migrate_skip_on_error_table");
+ String dest = source;
+
+ File location = temp.newFolder();
+ spark.sql(String.format(CREATE_PARQUET, source, location));
+ CatalogTable table = loadSessionTable(source);
+ Seq<String> partitionColumns = table.partitionColumnNames();
+ String format = table.provider().get();
+
+ spark
+ .table(baseTableName)
+ .write()
+ .mode(SaveMode.Append)
+ .format(format)
+ .partitionBy(partitionColumns.toSeq())
+ .saveAsTable(source);
+
+ spark
+ .table(baseTableName)
+ .write()
+ .mode(SaveMode.Append)
+ .format(format)
+ .partitionBy(partitionColumns.toSeq())
+ .saveAsTable(source);
+
+ List<File> expectedFiles =
expectedFiles(source).collect(Collectors.toList());
+
+ Assert.assertEquals("Expected number of source files", 2,
expectedFiles.size());
Review Comment:
```suggestion
Assertions.assertThat(expectedFiles).hasSize(2);
```
##########
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java:
##########
@@ -139,9 +213,18 @@ public static List<DataFile> listPartition(
buildDataFile(fileStatus.get(index), partitionKey, spec,
metrics, "orc");
});
} else {
- throw new UnsupportedOperationException("Unknown partition format: " +
format);
+ if (skipOnError) {
+ LOG.warn("Skipping unknown partition format: {} - {}", format,
partitionUri);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unknown partition format: "
Review Comment:
it would probably be better to use String.format here rather than string
concatenation
##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCreateActions.java:
##########
@@ -406,6 +408,76 @@ public void testMigrateUnpartitioned() throws Exception {
assertMigratedFileCount(SparkActions.get().migrateTable(source), source,
dest);
}
+ @Test
+ public void testMigrateSkipOnError() throws Exception {
+ Assume.assumeTrue("Cannot migrate to a hadoop based catalog",
!type.equals("hadoop"));
+ Assume.assumeTrue(
+ "Can only migrate from Spark Session Catalog",
catalog.name().equals("spark_catalog"));
+ String source = sourceName("test_migrate_skip_on_error_table");
+ String dest = source;
+
+ File location = temp.newFolder();
+ spark.sql(String.format(CREATE_PARQUET, source, location));
+ CatalogTable table = loadSessionTable(source);
+ Seq<String> partitionColumns = table.partitionColumnNames();
+ String format = table.provider().get();
+
+ spark
+ .table(baseTableName)
+ .write()
+ .mode(SaveMode.Append)
+ .format(format)
+ .partitionBy(partitionColumns.toSeq())
+ .saveAsTable(source);
+
+ spark
+ .table(baseTableName)
+ .write()
+ .mode(SaveMode.Append)
+ .format(format)
+ .partitionBy(partitionColumns.toSeq())
+ .saveAsTable(source);
+
+ List<File> expectedFiles =
expectedFiles(source).collect(Collectors.toList());
+
+ Assert.assertEquals("Expected number of source files", 2,
expectedFiles.size());
+
+ // Corrupt the second file
+ File file = expectedFiles.get(1);
+ Assume.assumeTrue("Delete source file!", file.delete());
+ Assume.assumeTrue("Create a empty source file!", file.createNewFile());
+
+ MigrateTable migrateAction = SparkActions.get().migrateTable(source);
+
+ AssertHelpers.assertThrows(
+ "Expected an exception",
+ RuntimeException.class,
+ "not a Parquet file (length is too low: 0)",
+ migrateAction::execute);
+
+ // skip files which cannot be imported into Iceberg
+ migrateAction = SparkActions.get().migrateTable(source).skipOnError();
+
+ MigrateTable.Result migratedFiles = migrateAction.execute();
+ validateTables(source, dest);
+
+ SparkTable destTable = loadTable(dest);
+ Assert.assertEquals(
+ "Provider should be iceberg",
+ "iceberg",
+ destTable.properties().get(TableCatalog.PROP_PROVIDER));
+ List<Row> actual = spark.table(dest).collectAsList();
+
+ Assert.assertEquals(
Review Comment:
I'd rather suggest to use `Assertions.assertThat(actual).hasSize(3);` as
that is easier to debug when the assertion ever fails.
For example, I've modified it locally to
`Assertions.assertThat(actual).hasSize(2);` and the output shows what the
actual content was, which makes debugging (especially on CI failures) much
easier.
```
java.lang.AssertionError:
Expected size: 2 but was: 3 in:
[[1,a], [2,b], [3,c]]
```
##########
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java:
##########
@@ -139,9 +213,18 @@ public static List<DataFile> listPartition(
buildDataFile(fileStatus.get(index), partitionKey, spec,
metrics, "orc");
});
} else {
- throw new UnsupportedOperationException("Unknown partition format: " +
format);
+ if (skipOnError) {
+ LOG.warn("Skipping unknown partition format: {} - {}", format,
partitionUri);
+ } else {
+ throw new UnsupportedOperationException(
Review Comment:
it looks like this part is actually never hit in the tests and I wonder how
difficult it would be to have a test that would hit this exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]