kbendick commented on a change in pull request #3647:
URL: https://github.com/apache/iceberg/pull/3647#discussion_r762170768
##########
File path:
flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -381,4 +383,31 @@ public void testRewriteAvoidRepeateCompress() throws
IOException {
expected.add(SimpleDataUtil.createRecord(2, "b"));
SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
}
+
+ @Test
+ public void testRewriteDataFilesMergeCheckpointInfo() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+ sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+ icebergTableUnPartitioned.refresh();
+
+ Map<String, String> summary =
icebergTableUnPartitioned.currentSnapshot().summary();
+ String flinkJobIdBeforeRewrite = summary.get(SnapshotSummary.FLINK_JOB_ID);
+ String flinkMaxCommitedCheckpointIDBeforeRewrite =
summary.get(SnapshotSummary.FLINK_MAX_COMMITTED_CHECKPOINT_ID);
+
+ Actions.forTable(icebergTableUnPartitioned)
+ .rewriteDataFiles()
+ .execute();
Review comment:
Nit: both of these lines are over-indented. They should be indented 4
spaces (just after the i in Actions).
##########
File path:
flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -381,4 +383,31 @@ public void testRewriteAvoidRepeateCompress() throws
IOException {
expected.add(SimpleDataUtil.createRecord(2, "b"));
SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned, expected);
}
+
+ @Test
+ public void testRewriteDataFilesMergeCheckpointInfo() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTITIONED);
+ sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTITIONED);
+
+ icebergTableUnPartitioned.refresh();
+
+ Map<String, String> summary =
icebergTableUnPartitioned.currentSnapshot().summary();
+ String flinkJobIdBeforeRewrite = summary.get(SnapshotSummary.FLINK_JOB_ID);
+ String flinkMaxCommitedCheckpointIDBeforeRewrite =
summary.get(SnapshotSummary.FLINK_MAX_COMMITTED_CHECKPOINT_ID);
+
+ Actions.forTable(icebergTableUnPartitioned)
+ .rewriteDataFiles()
+ .execute();
+
+ icebergTableUnPartitioned.refresh();
+
+ Map<String, String> summaryAfterRewrite =
icebergTableUnPartitioned.currentSnapshot().summary();
+
+ // Assert the flink job info retain after rewrite.
+ Assert.assertEquals("Should retain flinkJobId after rewrite",
+ flinkJobIdBeforeRewrite,
summaryAfterRewrite.get(SnapshotSummary.FLINK_JOB_ID));
+ Assert.assertEquals("Should retain flinkMaxCommitedCheckpointID after
rewrite",
+ flinkMaxCommitedCheckpointIDBeforeRewrite,
+
summaryAfterRewrite.get(SnapshotSummary.FLINK_MAX_COMMITTED_CHECKPOINT_ID));
Review comment:
Nit: same note here about indentation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]