szehon-ho commented on a change in pull request #2989: URL: https://github.com/apache/iceberg/pull/2989#discussion_r701301837
########## File path: flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java ########## @@ -259,4 +265,37 @@ public static StructLikeSet actualRowSet(Table table, Long snapshotId, String... return dataFiles; } + + public static Map<Long, List<DataFile>> snapshotToDataFiles( + Table table) + throws IOException { + table.refresh(); + Map<Long, List<DataFile>> res = Maps.newHashMap(); + List<ManifestFile> manifestFiles = table.currentSnapshot().allManifests(); + for (ManifestFile mf : manifestFiles) { + try (ManifestReader<DataFile> reader = ManifestFiles.read(mf, table.io())) { + List<DataFile> dataFiles = IteratorUtils.toList(reader.iterator()); + if (res.containsKey(mf.snapshotId())) { + res.get(mf.snapshotId()).addAll(dataFiles); + } else { + res.put(mf.snapshotId(), dataFiles); + } + } + } + return res; + } + + public static List<DataFile> matchingPartitions( + List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, Object> partitionValues) { + Types.StructType spec = partitionSpec.partitionType(); + Record partitionRecord = GenericRecord.create(spec).copy(partitionValues); + StructLikeWrapper expected = StructLikeWrapper Review comment: That's a good idea, but when I tried I found PartitionData is package protected so can't access it here unfortunately. ########## File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java ########## @@ -170,114 +171,121 @@ public void testOverwriteTable() throws Exception { public void testReplacePartitions() throws Exception { Assume.assumeFalse("Flink unbounded streaming does not support overwrite operation", isStreamingJob); String tableName = "test_partition"; - sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", tableName, format.name()); - Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - - sql("INSERT INTO %s SELECT 1, 'a'", tableName); - sql("INSERT INTO %s SELECT 2, 'b'", tableName); - sql("INSERT INTO %s SELECT 3, 'c'", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(1, "a"), - SimpleDataUtil.createRecord(2, "b"), - SimpleDataUtil.createRecord(3, "c") - )); - - sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName); - sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(5, "a"), - SimpleDataUtil.createRecord(4, "b"), - SimpleDataUtil.createRecord(3, "c") - )); - - sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(6, "a"), - SimpleDataUtil.createRecord(4, "b"), - SimpleDataUtil.createRecord(3, "c") - )); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + try { + Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + + sql("INSERT INTO %s SELECT 1, 'a'", tableName); + sql("INSERT INTO %s SELECT 2, 'b'", tableName); + sql("INSERT INTO %s SELECT 3, 'c'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + + sql("INSERT OVERWRITE %s SELECT 4, 'b'", tableName); + sql("INSERT OVERWRITE %s SELECT 5, 'a'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(5, "a"), + SimpleDataUtil.createRecord(4, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + + sql("INSERT OVERWRITE %s PARTITION (data='a') SELECT 6", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(6, "a"), + SimpleDataUtil.createRecord(4, "b"), + SimpleDataUtil.createRecord(3, "c") + )); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } } @Test public void testInsertIntoPartition() throws Exception { String tableName = "test_insert_into_partition"; - sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH ('write.format.default'='%s')", tableName, format.name()); - Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - - // Full partition. - sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName); - sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName); - sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(1, "a"), - SimpleDataUtil.createRecord(2, "a"), - SimpleDataUtil.createRecord(3, "b") - )); - - // Partial partition. - sql("INSERT INTO %s SELECT 4, 'c'", tableName); - sql("INSERT INTO %s SELECT 5, 'd'", tableName); - - SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( - SimpleDataUtil.createRecord(1, "a"), - SimpleDataUtil.createRecord(2, "a"), - SimpleDataUtil.createRecord(3, "b"), - SimpleDataUtil.createRecord(4, "c"), - SimpleDataUtil.createRecord(5, "d") - )); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + try { + Table partitionedTable = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + + // Full partition. + sql("INSERT INTO %s PARTITION (data='a') SELECT 1", tableName); + sql("INSERT INTO %s PARTITION (data='a') SELECT 2", tableName); + sql("INSERT INTO %s PARTITION (data='b') SELECT 3", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "a"), + SimpleDataUtil.createRecord(3, "b") + )); + + // Partial partition. + sql("INSERT INTO %s SELECT 4, 'c'", tableName); + sql("INSERT INTO %s SELECT 5, 'd'", tableName); + + SimpleDataUtil.assertTableRecords(partitionedTable, Lists.newArrayList( + SimpleDataUtil.createRecord(1, "a"), + SimpleDataUtil.createRecord(2, "a"), + SimpleDataUtil.createRecord(3, "b"), + SimpleDataUtil.createRecord(4, "c"), + SimpleDataUtil.createRecord(5, "d") + )); + } finally { + sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + } } @Test public void testHashDistributeMode() throws Exception { String tableName = "test_hash_distribution_mode"; - Map<String, String> tableProps = ImmutableMap.of( "write.format.default", format.name(), TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.HASH.modeName() ); sql("CREATE TABLE %s(id INT, data VARCHAR) PARTITIONED BY (data) WITH %s", tableName, toWithClause(tableProps)); - // Insert data set. - sql("INSERT INTO %s VALUES " + - "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " + - "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " + - "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName); - - Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); - SimpleDataUtil.assertTableRecords(table, ImmutableList.of( - SimpleDataUtil.createRecord(1, "aaa"), - SimpleDataUtil.createRecord(1, "bbb"), - SimpleDataUtil.createRecord(1, "ccc"), - SimpleDataUtil.createRecord(2, "aaa"), - SimpleDataUtil.createRecord(2, "bbb"), - SimpleDataUtil.createRecord(2, "ccc"), - SimpleDataUtil.createRecord(3, "aaa"), - SimpleDataUtil.createRecord(3, "bbb"), - SimpleDataUtil.createRecord(3, "ccc") - )); - - Assert.assertEquals("There should be only 1 data file in partition 'aaa'", 1, - SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "aaa")).size()); - Assert.assertEquals("There should be only 1 data file in partition 'bbb'", 1, - SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "bbb")).size()); - Assert.assertEquals("There should be only 1 data file in partition 'ccc'", 1, - SimpleDataUtil.partitionDataFiles(table, ImmutableMap.of("data", "ccc")).size()); - - sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName); + try { + // Insert data set. + sql("INSERT INTO %s VALUES " + + "(1, 'aaa'), (1, 'bbb'), (1, 'ccc'), " + + "(2, 'aaa'), (2, 'bbb'), (2, 'ccc'), " + + "(3, 'aaa'), (3, 'bbb'), (3, 'ccc')", tableName); + + Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, tableName)); + SimpleDataUtil.assertTableRecords(table, ImmutableList.of( + SimpleDataUtil.createRecord(1, "aaa"), + SimpleDataUtil.createRecord(1, "bbb"), + SimpleDataUtil.createRecord(1, "ccc"), + SimpleDataUtil.createRecord(2, "aaa"), + SimpleDataUtil.createRecord(2, "bbb"), + SimpleDataUtil.createRecord(2, "ccc"), + SimpleDataUtil.createRecord(3, "aaa"), + SimpleDataUtil.createRecord(3, "bbb"), + SimpleDataUtil.createRecord(3, "ccc") + )); + + // Sometimes we will have more than one checkpoint if we pass the auto checkpoint interval Review comment: Updated comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org