marton-bod commented on a change in pull request #3377:
URL: https://github.com/apache/iceberg/pull/3377#discussion_r737200653
##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -505,4 +517,91 @@ public void testFanoutDataWriterMultiplePartitions()
throws IOException {
);
Assert.assertEquals("Records should match", toSet(expectedRows),
actualRowSet("*"));
}
+
+ @Test
+ public void testFanoutDeleteWriterMultipleSpecs() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // insert some unpartitioned data
+ ImmutableList<T> unpartRows = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "bbb"),
+ toRow(3, "ccc")
+ );
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, unpartRows,
table.spec(), null);
+ table.newFastAppend()
+ .appendFile(dataFile1)
+ .commit();
+
+ // identity partition using the 'data' column
+ table.updateSpec().addField("data").commit();
+ // insert some partitioned data
+ ImmutableList<T> identityRows1 = ImmutableList.of(
+ toRow(4, "fff"),
+ toRow(5, "fff"),
+ toRow(6, "fff")
+ );
+ ImmutableList<T> identityRows2 = ImmutableList.of(
+ toRow(7, "rrr"),
+ toRow(8, "rrr"),
+ toRow(9, "rrr")
+ );
+ DataFile dataFile2 =
+ writeData(writerFactory, fileFactory, identityRows1, table.spec(),
partitionKey(table.spec(), "fff"));
+ DataFile dataFile3 =
+ writeData(writerFactory, fileFactory, identityRows2, table.spec(),
partitionKey(table.spec(), "rrr"));
+ table.newFastAppend()
+ .appendFile(dataFile2)
+ .appendFile(dataFile3)
+ .commit();
+
+ // switch to using bucket partitioning on the 'data' column
+ table.updateSpec().removeField("data").addField(Expressions.bucket("data",
16)).commit();
+ // insert some data
+ ImmutableList<T> bucketedRows = ImmutableList.of(
+ toRow(10, "rrr"),
+ toRow(11, "rrr"),
+ toRow(12, "rrr")
+ );
+ DataFile dataFile4 =
+ writeData(writerFactory, fileFactory, bucketedRows, table.spec(),
partitionKey(table.spec(), "rrr"));
+ table.newFastAppend()
+ .appendFile(dataFile4)
+ .commit();
+
+ PartitionSpec unpartitionedSpec = table.specs().get(0);
+ PartitionSpec identitySpec = table.specs().get(1);
+ PartitionSpec bucketedSpec = table.specs().get(2);
+
+ // delete some records
+ FanoutPositionDeleteWriter<T> writer =
+ new FanoutPositionDeleteWriter<>(writerFactory, fileFactory,
table.io(), TARGET_FILE_SIZE);
+ writer.write(positionDelete(dataFile1.path(), 0L, null),
unpartitionedSpec, null);
+ writer.write(positionDelete(dataFile2.path(), 0L, null), identitySpec,
partitionKey(identitySpec, "fff"));
+ writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec,
partitionKey(identitySpec, "fff"));
+ writer.write(positionDelete(dataFile3.path(), 2L, null), identitySpec,
partitionKey(identitySpec, "rrr"));
+ writer.write(positionDelete(dataFile4.path(), 0L, null), bucketedSpec,
partitionKey(bucketedSpec, "rrr"));
+ // pepper in some out-of-order spec deletes, which shouldn't cause
problems for fanout writer
+ writer.write(positionDelete(dataFile1.path(), 1L, null),
unpartitionedSpec, null);
Review comment:
Good idea
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]