xabriel commented on a change in pull request #315: [WIP] Incremental
processing prototype
URL: https://github.com/apache/incubator-iceberg/pull/315#discussion_r358371808
##########
File path: core/src/test/java/org/apache/iceberg/TestIncrementalDataScan.java
##########
@@ -25,63 +25,122 @@
import java.util.List;
import java.util.Set;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class TestIncrementalDataScan extends TableTestBase {
- @Test
- public void testAppend() {
- // ManifestEntry.Existing flag is only set when manifests are merged
+
+ @Before
+ public void setupTableProperties() {
table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"2").commit();
- add(files("A")); // 0
- add(files("B"));
- add(files("C"));
- add(files("D"));
- add(files("E")); // 4
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(0, 4));
-
- delete(files("C", "D", "E")); // 5
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(0, 4));
- // Sine 5th is a snapshot with delete operation. It is ignored
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(0, 5));
- Assert.assertEquals(Sets.newHashSet("D", "E"), appendsBetweenScan(2, 5));
- Assert.assertEquals(Sets.newHashSet("E"), appendsBetweenScan(3, 5));
- // Sine 5th is a snapshot with delete operation. It is ignored
- Assert.assertTrue(appendsBetweenScan(4, 5).isEmpty());
-
- add(files("F")); // 6
- add(files("G")); // 7
- add(files("H")); // 8
-
- // Idempotent scans - old identifiers still give back existing data
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(0, 4));
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(0, 5));
-
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "F", "G", "H"),
appendsBetweenScan(0, 8));
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "F", "G"),
appendsBetweenScan(0, 7));
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "F"),
appendsBetweenScan(0, 6));
}
@Test
- public void testReplace() {
- table.updateProperties().set(TableProperties.MANIFEST_MIN_MERGE_COUNT,
"2").commit();
- add(files("A")); // 0
- add(files("B"));
- add(files("C"));
- add(files("D"));
- add(files("E")); // 4
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(0, 4));
-
- replace(files("A", "B", "C"), files("F", "G")); // 5
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(0, 5));
- // 5th snapshot was a replace. No new content was added
- Assert.assertTrue(appendsBetweenScan(4, 5).isEmpty());
- Assert.assertEquals(Sets.newHashSet("E"), appendsBetweenScan(3, 5));
-
- add(files("H"));
- add(files("I")); // 7
- Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "H", "I"),
appendsBetweenScan(0, 7));
- Assert.assertEquals(Sets.newHashSet("I"), appendsBetweenScan(6, 7));
- Assert.assertEquals(Sets.newHashSet("H", "I"), appendsBetweenScan(5, 7));
+ public void testInvalidScans() {
+ add(table.newAppend(), files("A"));
+ AssertHelpers.assertThrows(
+ "from and to snapshots cannot be the same, since from snapshot is
exclusive and not part of the scan",
+ IllegalArgumentException.class, "from and to snapshots cannot be the
same", () -> appendsBetweenScan(1, 1));
+ }
+
+ @Test
+ public void testAppends() {
+ add(table.newAppend(), files("A")); // 1
+ add(table.newAppend(), files("B"));
+ add(table.newAppend(), files("C"));
+ add(table.newAppend(), files("D"));
+ add(table.newAppend(), files("E")); // 5
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(1, 5));
+ Assert.assertEquals(Sets.newHashSet("C", "D", "E"), appendsBetweenScan(2,
5));
+ }
+
+ @Test
+ public void testReplaceOverwritesDeletes() {
+ add(table.newAppend(), files("A")); // 1
+ add(table.newAppend(), files("B"));
+ add(table.newAppend(), files("C"));
+ add(table.newAppend(), files("D"));
+ add(table.newAppend(), files("E")); // 5
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E"),
appendsBetweenScan(1, 5));
+
+ replace(table.newRewrite(), files("A", "B", "C"), files("F", "G")); // 6
+ Assert.assertEquals("Replace commits are ignored", Sets.newHashSet("B",
"C", "D", "E"), appendsBetweenScan(1, 6));
+ Assert.assertEquals(Sets.newHashSet("E"), appendsBetweenScan(4, 6));
+ // 6th snapshot is a replace. No new content is added
+ Assert.assertTrue("Replace commits are ignored", appendsBetweenScan(5,
6).isEmpty());
+ delete(table.newDelete(), files("D")); // 7
+ // 7th snapshot is a delete.
+ Assert.assertTrue("Replace and delete commits are ignored",
appendsBetweenScan(5, 7).isEmpty());
+ Assert.assertTrue("Delete commits are ignored", appendsBetweenScan(6,
7).isEmpty());
+ // 8th snapshot is an overwrite
+ overwrite(table.newOverwrite(), files("H"), files("E"));
+
+ add(table.newAppend(), files("I")); // 9
+ // snapshots 6, 7 and 8 are ignored
+ Assert.assertEquals(Sets.newHashSet("B", "C", "D", "E", "I"),
appendsBetweenScan(1, 9));
Review comment:
I think we had a bit of this conversation elsewhere, but wanted to point out:
Seeing this gets me thinking: should we include a mode where deletes are
honored? Something like `appendsBetween(1, 9, Mode.HonorDeletes)`, which in
this case would return `Set("B", "C", "E", "I")` since `D` has been deleted.
This can be considered in a separate PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]