gustavoatt commented on code in PR #4614:
URL: https://github.com/apache/iceberg/pull/4614#discussion_r857706460
##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1415,6 +1415,89 @@ public void testRemoveOrphanFilesActionSupport() throws
InterruptedException {
Assert.assertEquals("Rows must match", records, actualRecords);
}
+ @Test
+ public void testOverwritePartition() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+ Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("id").build());
+
+ Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new
SimpleRecord(1, "a")), SimpleRecord.class);
+ df1.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ List<SimpleRecord> result = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals(result, Lists.newArrayList(new SimpleRecord(1, "a")));
+
+ // Now, we try to overwrite the partition id=1
+ Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new
SimpleRecord(1, "b")), SimpleRecord.class);
+ df2.select("id", "data").write()
+ .format("iceberg")
+ .mode("overwrite")
+ .option("overwrite-partitions", "id=1")
+ .save(loadLocation(tableIdentifier));
+
+ result = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
Review Comment:
Done.
##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java:
##########
@@ -173,6 +185,38 @@ private void replacePartitions(WriterCommitMessage[]
messages) {
commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite");
}
+ private void replaceOverwritePartitions(WriterCommitMessage[] messages) {
+ final Expression overwriteExpression =
parseOverwritePartitionFilter(overwriteFilter);
+ Iterable<DataFile> files = files(messages);
+
+ if (!files.iterator().hasNext()) {
+ LOG.info("Static overwrite is empty, skipping commit");
+ return;
+ }
+
+ OverwriteFiles overwriteFiles = table.newOverwrite()
+ .overwriteByRowFilter(overwriteExpression);
Review Comment:
Done. I changed my editor to use the right indentation level.
##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1415,6 +1415,89 @@ public void testRemoveOrphanFilesActionSupport() throws
InterruptedException {
Assert.assertEquals("Rows must match", records, actualRecords);
}
+ @Test
+ public void testOverwritePartition() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+ Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("id").build());
+
+ Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new
SimpleRecord(1, "a")), SimpleRecord.class);
+ df1.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ List<SimpleRecord> result = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals(result, Lists.newArrayList(new SimpleRecord(1, "a")));
+
+ // Now, we try to overwrite the partition id=1
+ Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new
SimpleRecord(1, "b")), SimpleRecord.class);
+ df2.select("id", "data").write()
+ .format("iceberg")
+ .mode("overwrite")
+ .option("overwrite-partitions", "id=1")
+ .save(loadLocation(tableIdentifier));
+
+ result = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals(result, Lists.newArrayList(new SimpleRecord(1, "b")));
+ }
+
+ @Test
+ public void testOverwritePartitionFilterMultipleSpecs() {
+ TableIdentifier identifier = TableIdentifier.of("db", "table");
+ Schema schema = new Schema(
+ Types.NestedField.optional(1, "ds", Types.StringType.get()),
+ Types.NestedField.optional(2, "data", Types.IntegerType.get()));
+ PartitionSpec originalSpec =
PartitionSpec.builderFor(schema).identity("ds").build();
+ Table table = createTable(identifier, schema, originalSpec);
+
+ List<Row> records = Lists.newArrayList(
+ RowFactory.create("2021-01-01", 1),
+ RowFactory.create("2021-01-02", 2)
+ );
+ StructType originalSparkSchema = SparkSchemaUtil.convert(schema);
+ Dataset<Row> inputDf = spark.createDataFrame(records, originalSparkSchema);
+
+ inputDf.select("ds", "data").write()
+ .format("iceberg")
+ .mode(SaveMode.Overwrite)
+ .save(loadLocation(identifier));
+
+
+ Dataset<Row> readInputDf = spark.read()
+ .format("iceberg")
+ .load(loadLocation(identifier));
+ Assert.assertEquals(2, readInputDf.count());
+
+ // Now let's alter the partition spec.
+ table.updateSpec().addField("data").commit();
+
+ List<Row> insertRecords = Lists.newArrayList(
+ RowFactory.create("2021-01-01", 100)
+ );
+ spark.createDataFrame(insertRecords, originalSparkSchema)
+ .select("ds", "data")
+ .write()
+ .format("iceberg")
+ .option("overwrite-partitions", "ds='2021-01-01'")
+ .mode(SaveMode.Overwrite)
Review Comment:
Makes sense. Done, although there are still more uses of `SaveMode` within
this class in other tests.
##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java:
##########
@@ -66,10 +71,13 @@
class Writer implements DataSourceWriter {
private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
+ private final SparkSession spark;
private final JavaSparkContext sparkContext;
private final Table table;
private final FileFormat format;
private final boolean replacePartitions;
+
Review Comment:
It slipped there. Removed.
##########
spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java:
##########
@@ -112,8 +122,10 @@ public DataWriterFactory<InternalRow>
createWriterFactory() {
@Override
public void commit(WriterCommitMessage[] messages) {
- if (replacePartitions) {
+ if (replacePartitions && overwriteFilter == null) {
replacePartitions(messages);
+ } else if (replacePartitions) {
Review Comment:
It should, but the `overwriteFilter != null` condition is always true at
this point, since otherwise it would entered the branch at line 125. Added the
condition for better readability.
Good point about what happens when the overwrite mode is set to `append` but
we give an overwrite filter. I throw an exception here when that happens.
Should I throw here or rather on `IcebergSource` just before creating the
writer?
##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java:
##########
@@ -1415,6 +1415,89 @@ public void testRemoveOrphanFilesActionSupport() throws
InterruptedException {
Assert.assertEquals("Rows must match", records, actualRecords);
}
+ @Test
+ public void testOverwritePartition() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+ Table table = createTable(tableIdentifier, SCHEMA,
PartitionSpec.builderFor(SCHEMA).identity("id").build());
+
+ Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new
SimpleRecord(1, "a")), SimpleRecord.class);
+ df1.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(loadLocation(tableIdentifier));
+
+ List<SimpleRecord> result = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals(result, Lists.newArrayList(new SimpleRecord(1, "a")));
+
+ // Now, we try to overwrite the partition id=1
+ Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new
SimpleRecord(1, "b")), SimpleRecord.class);
+ df2.select("id", "data").write()
+ .format("iceberg")
+ .mode("overwrite")
+ .option("overwrite-partitions", "id=1")
+ .save(loadLocation(tableIdentifier));
+
+ result = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assert.assertEquals(result, Lists.newArrayList(new SimpleRecord(1, "b")));
+ }
+
+ @Test
+ public void testOverwritePartitionFilterMultipleSpecs() {
+ TableIdentifier identifier = TableIdentifier.of("db", "table");
+ Schema schema = new Schema(
+ Types.NestedField.optional(1, "ds", Types.StringType.get()),
+ Types.NestedField.optional(2, "data", Types.IntegerType.get()));
+ PartitionSpec originalSpec =
PartitionSpec.builderFor(schema).identity("ds").build();
+ Table table = createTable(identifier, schema, originalSpec);
+
+ List<Row> records = Lists.newArrayList(
+ RowFactory.create("2021-01-01", 1),
+ RowFactory.create("2021-01-02", 2)
+ );
+ StructType originalSparkSchema = SparkSchemaUtil.convert(schema);
+ Dataset<Row> inputDf = spark.createDataFrame(records, originalSparkSchema);
+
+ inputDf.select("ds", "data").write()
+ .format("iceberg")
+ .mode(SaveMode.Overwrite)
+ .save(loadLocation(identifier));
+
+
+ Dataset<Row> readInputDf = spark.read()
+ .format("iceberg")
+ .load(loadLocation(identifier));
+ Assert.assertEquals(2, readInputDf.count());
+
+ // Now let's alter the partition spec.
+ table.updateSpec().addField("data").commit();
+
+ List<Row> insertRecords = Lists.newArrayList(
+ RowFactory.create("2021-01-01", 100)
+ );
+ spark.createDataFrame(insertRecords, originalSparkSchema)
+ .select("ds", "data")
+ .write()
+ .format("iceberg")
+ .option("overwrite-partitions", "ds='2021-01-01'")
+ .mode(SaveMode.Overwrite)
+ .save(loadLocation(identifier));
+
+ // When reading we should only see two rows still
+ readInputDf = spark.read()
+ .format("iceberg")
+ .load(loadLocation(identifier));
+ Assert.assertEquals(2, readInputDf.count());
Review Comment:
Agreed, done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]