szehon-ho commented on a change in pull request #2989:
URL: https://github.com/apache/iceberg/pull/2989#discussion_r701301837



##########
File path: flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
##########
@@ -259,4 +265,37 @@ public static StructLikeSet actualRowSet(Table table, Long 
snapshotId, String...
 
     return dataFiles;
   }
+
+  public static Map<Long, List<DataFile>> snapshotToDataFiles(
+      Table table)
+      throws IOException {
+    table.refresh();
+    Map<Long, List<DataFile>> res = Maps.newHashMap();
+    List<ManifestFile> manifestFiles = table.currentSnapshot().allManifests();
+    for (ManifestFile mf : manifestFiles) {
+      try (ManifestReader<DataFile> reader = ManifestFiles.read(mf, 
table.io())) {
+        List<DataFile> dataFiles = IteratorUtils.toList(reader.iterator());
+        if (res.containsKey(mf.snapshotId())) {
+          res.get(mf.snapshotId()).addAll(dataFiles);
+        } else {
+          res.put(mf.snapshotId(), dataFiles);
+        }
+      }
+    }
+    return res;
+  }
+
+  public static List<DataFile> matchingPartitions(
+      List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, 
Object> partitionValues) {
+    Types.StructType spec = partitionSpec.partitionType();
+    Record partitionRecord = GenericRecord.create(spec).copy(partitionValues);
+    StructLikeWrapper expected = StructLikeWrapper

Review comment:
       That's a good idea, but when I tried I found PartitionData is package 
protected so can't access it here unfortunately.

##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
##########
@@ -170,114 +171,121 @@ public void testOverwriteTable() throws Exception {
   public void testReplacePartitions() throws Exception {
     Assume.assumeFalse("Flink unbounded streaming does not support overwrite 
operation", isStreamingJob);
     String tableName = "test_partition";
-
     sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH 
('write.format.default'='%s')",
         tableName, format.name());
 
-    Table partitionedTable = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
-
-    sql("INSERT INTO %s SELECT 1, 'a'", tableName);
-    sql("INSERT INTO %s SELECT 2, 'b'", tableName);
-    sql("INSERT INTO %s SELECT 3, 'c'", tableName);
-
-    SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
-        SimpleDataUtil.createRecord(1, "a"),
-        SimpleDataUtil.createRecord(2, "b"),
-        SimpleDataUtil.createRecord(3, "c")
-    ));
-
-    sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName);
-    sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName);
-
-    SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
-        SimpleDataUtil.createRecord(5, "a"),
-        SimpleDataUtil.createRecord(4, "b"),
-        SimpleDataUtil.createRecord(3, "c")
-    ));
-
-    sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName);
-
-    SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
-        SimpleDataUtil.createRecord(6, "a"),
-        SimpleDataUtil.createRecord(4, "b"),
-        SimpleDataUtil.createRecord(3, "c")
-    ));
-
-    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    try {
+      Table partitionedTable = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+
+      sql("INSERT INTO %s SELECT 1, 'a'", tableName);
+      sql("INSERT INTO %s SELECT 2, 'b'", tableName);
+      sql("INSERT INTO %s SELECT 3, 'c'", tableName);
+
+      SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+          SimpleDataUtil.createRecord(1, "a"),
+          SimpleDataUtil.createRecord(2, "b"),
+          SimpleDataUtil.createRecord(3, "c")
+      ));
+
+      sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName);
+      sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName);
+
+      SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+          SimpleDataUtil.createRecord(5, "a"),
+          SimpleDataUtil.createRecord(4, "b"),
+          SimpleDataUtil.createRecord(3, "c")
+      ));
+
+      sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName);
+
+      SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+          SimpleDataUtil.createRecord(6, "a"),
+          SimpleDataUtil.createRecord(4, "b"),
+          SimpleDataUtil.createRecord(3, "c")
+      ));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
   }
 
   @Test
   public void testInsertIntoPartition() throws Exception {
     String tableName = "test_insert_into_partition";
-
     sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH 
('write.format.default'='%s')",
         tableName, format.name());
 
-    Table partitionedTable = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
-
-    // Full partition.
-    sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName);
-    sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName);
-    sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName);
-
-    SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
-        SimpleDataUtil.createRecord(1, "a"),
-        SimpleDataUtil.createRecord(2, "a"),
-        SimpleDataUtil.createRecord(3, "b")
-    ));
-
-    // Partial partition.
-    sql("INSERT INTO %s SELECT 4, 'c'", tableName);
-    sql("INSERT INTO %s SELECT 5, 'd'", tableName);
-
-    SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
-        SimpleDataUtil.createRecord(1, "a"),
-        SimpleDataUtil.createRecord(2, "a"),
-        SimpleDataUtil.createRecord(3, "b"),
-        SimpleDataUtil.createRecord(4, "c"),
-        SimpleDataUtil.createRecord(5, "d")
-    ));
-
-    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    try {
+      Table partitionedTable = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+
+      // Full partition.
+      sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName);
+      sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName);
+      sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName);
+
+      SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+          SimpleDataUtil.createRecord(1, "a"),
+          SimpleDataUtil.createRecord(2, "a"),
+          SimpleDataUtil.createRecord(3, "b")
+      ));
+
+      // Partial partition.
+      sql("INSERT INTO %s SELECT 4, 'c'", tableName);
+      sql("INSERT INTO %s SELECT 5, 'd'", tableName);
+
+      SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList(
+          SimpleDataUtil.createRecord(1, "a"),
+          SimpleDataUtil.createRecord(2, "a"),
+          SimpleDataUtil.createRecord(3, "b"),
+          SimpleDataUtil.createRecord(4, "c"),
+          SimpleDataUtil.createRecord(5, "d")
+      ));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
   }
 
   @Test
   public void testHashDistributeMode() throws Exception {
     String tableName = "test_hash_distribution_mode";
-
     Map<String, String> tableProps = ImmutableMap.of(
         "write.format.default", format.name(),
         TableProperties.WRITE_DISTRIBUTION_MODE, 
DistributionMode.HASH.modeName()
     );
     sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s",
         tableName, toWithClause(tableProps));
 
-    // Insert data set.
-    sql("INSERT INTO %s VALUES " +
-        "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
-        "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
-        "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
-
-    Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
-    SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
-        SimpleDataUtil.createRecord(1, "aaa"),
-        SimpleDataUtil.createRecord(1, "bbb"),
-        SimpleDataUtil.createRecord(1, "ccc"),
-        SimpleDataUtil.createRecord(2, "aaa"),
-        SimpleDataUtil.createRecord(2, "bbb"),
-        SimpleDataUtil.createRecord(2, "ccc"),
-        SimpleDataUtil.createRecord(3, "aaa"),
-        SimpleDataUtil.createRecord(3, "bbb"),
-        SimpleDataUtil.createRecord(3, "ccc")
-    ));
-
-    Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 
1,
-        SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", 
"aaa")).size());
-    Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 
1,
-        SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", 
"bbb")).size());
-    Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 
1,
-        SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", 
"ccc")).size());
-
-    sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    try {
+      // Insert data set.
+      sql("INSERT INTO %s VALUES " +
+          "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " +
+          "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " +
+          "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName);
+
+      Table table = 
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName));
+      SimpleDataUtil.assertTableRecords(table, ImmutableList.of(
+          SimpleDataUtil.createRecord(1, "aaa"),
+          SimpleDataUtil.createRecord(1, "bbb"),
+          SimpleDataUtil.createRecord(1, "ccc"),
+          SimpleDataUtil.createRecord(2, "aaa"),
+          SimpleDataUtil.createRecord(2, "bbb"),
+          SimpleDataUtil.createRecord(2, "ccc"),
+          SimpleDataUtil.createRecord(3, "aaa"),
+          SimpleDataUtil.createRecord(3, "bbb"),
+          SimpleDataUtil.createRecord(3, "ccc")
+      ));
+
+      // Sometimes we will have more than one checkpoint if we pass the auto 
checkpoint interval

Review comment:
       Updated comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to