jackye1995 commented on a change in pull request #3377:
URL: https://github.com/apache/iceberg/pull/3377#discussion_r736967765
##########
File path: data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
##########
@@ -505,4 +517,91 @@ public void testFanoutDataWriterMultiplePartitions()
throws IOException {
);
Assert.assertEquals("Records should match", toSet(expectedRows),
actualRowSet("*"));
}
+
+ @Test
+ public void testFanoutDeleteWriterMultipleSpecs() throws IOException {
+ FileWriterFactory<T> writerFactory = newWriterFactory(table.schema());
+
+ // insert some unpartitioned data
+ ImmutableList<T> unpartRows = ImmutableList.of(
+ toRow(1, "aaa"),
+ toRow(2, "bbb"),
+ toRow(3, "ccc")
+ );
+ DataFile dataFile1 = writeData(writerFactory, fileFactory, unpartRows,
table.spec(), null);
+ table.newFastAppend()
+ .appendFile(dataFile1)
+ .commit();
+
+ // identity partition using the 'data' column
+ table.updateSpec().addField("data").commit();
+ // insert some partitioned data
+ ImmutableList<T> identityRows1 = ImmutableList.of(
+ toRow(4, "fff"),
+ toRow(5, "fff"),
+ toRow(6, "fff")
+ );
+ ImmutableList<T> identityRows2 = ImmutableList.of(
+ toRow(7, "rrr"),
+ toRow(8, "rrr"),
+ toRow(9, "rrr")
+ );
+ DataFile dataFile2 =
+ writeData(writerFactory, fileFactory, identityRows1, table.spec(),
partitionKey(table.spec(), "fff"));
+ DataFile dataFile3 =
+ writeData(writerFactory, fileFactory, identityRows2, table.spec(),
partitionKey(table.spec(), "rrr"));
+ table.newFastAppend()
+ .appendFile(dataFile2)
+ .appendFile(dataFile3)
+ .commit();
+
+ // switch to using bucket partitioning on the 'data' column
+ table.updateSpec().removeField("data").addField(Expressions.bucket("data",
16)).commit();
+ // insert some data
+ ImmutableList<T> bucketedRows = ImmutableList.of(
+ toRow(10, "rrr"),
+ toRow(11, "rrr"),
+ toRow(12, "rrr")
+ );
+ DataFile dataFile4 =
+ writeData(writerFactory, fileFactory, bucketedRows, table.spec(),
partitionKey(table.spec(), "rrr"));
+ table.newFastAppend()
+ .appendFile(dataFile4)
+ .commit();
+
+ PartitionSpec unpartitionedSpec = table.specs().get(0);
+ PartitionSpec identitySpec = table.specs().get(1);
+ PartitionSpec bucketedSpec = table.specs().get(2);
+
+ // delete some records
+ FanoutPositionDeleteWriter<T> writer =
+ new FanoutPositionDeleteWriter<>(writerFactory, fileFactory,
table.io(), TARGET_FILE_SIZE);
+ writer.write(positionDelete(dataFile1.path(), 0L, null),
unpartitionedSpec, null);
+ writer.write(positionDelete(dataFile2.path(), 0L, null), identitySpec,
partitionKey(identitySpec, "fff"));
+ writer.write(positionDelete(dataFile2.path(), 1L, null), identitySpec,
partitionKey(identitySpec, "fff"));
+ writer.write(positionDelete(dataFile3.path(), 2L, null), identitySpec,
partitionKey(identitySpec, "rrr"));
+ writer.write(positionDelete(dataFile4.path(), 0L, null), bucketedSpec,
partitionKey(bucketedSpec, "rrr"));
+ // pepper in some out-of-order spec deletes, which shouldn't cause
problems for fanout writer
+ writer.write(positionDelete(dataFile1.path(), 1L, null),
unpartitionedSpec, null);
Review comment:
this is testing out-of-order spec, can you also add out-of-order
partitions by switching L581 and L582?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]