luoyuxia commented on code in PR #1658: URL: https://github.com/apache/fluss/pull/1658#discussion_r2332115414
########## fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java: ########## @@ -48,6 +54,277 @@ protected static void beforeAll() { execEnv.enableCheckpointing(1000); } + @Test + void testPosDeleteCompaction() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + try { + TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1"); + long t1Id = createPkTable(t1, true); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + List<InternalRow> rows = + Arrays.asList( Review Comment: nit ```suggestion Collections.singletonList ``` ########## fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java: ########## @@ -82,10 +82,15 @@ public IcebergRewriteDataFiles targetSizeInBytes(long targetSize) { return this; } - private List<CombinedScanTask> planRewriteFileGroups() throws IOException { + private List<CombinedScanTask> planRewriteFileGroups(Long snapshotId) throws IOException { Review Comment: nit ```suggestion private List<CombinedScanTask> planRewriteFileGroups(long snapshotId) throws IOException { ``` ########## fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java: ########## @@ -162,11 +162,20 @@ private Long commitRewrite( Map<String, String> snapshotProperties) { icebergTable.refresh(); RewriteFiles rewriteFiles = icebergTable.newRewrite(); - for (RewriteDataFileResult rewriteDataFileResult : rewriteDataFileResults) { - rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile); - rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile); - } try { + if (rewriteDataFileResults.stream() + .map(RewriteDataFileResult::snapshotId) + .distinct() + .count() + > 1) { + throw new IllegalArgumentException( + "Rewrite data file results must have same snapshot id."); + } + rewriteFiles.validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId()); + for (RewriteDataFileResult rewriteDataFileResult : rewriteDataFileResults) { + rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile); + rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile); + } Review Comment: Can you please help change https://github.com/apache/fluss/pull/1658/files#diff-fdc440a28ca4c3607311914ebc2c4d6f160654e8eff57e44624d60818a76860fR186 to ``` `.map(ContentFile::location)` ``` in this pr. ########## fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java: ########## @@ -48,6 +54,277 @@ protected static void beforeAll() { execEnv.enableCheckpointing(1000); } + @Test + void testPosDeleteCompaction() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + try { + TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1"); + long t1Id = createPkTable(t1, true); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + List<InternalRow> rows = Review Comment: I just feel a little complex in this test. No need to cover all datatypes in this pr. Coud we just use two columns `int, string` to make the test simple? ########## fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java: ########## @@ -44,6 +47,10 @@ public List<DataFile> addedDataFiles() { return addedDataFiles; } + public long snapshotId() { + return snapshotId; + } + @Override public String toString() { return "RewriteDataFileResult{" Review Comment: nit: `toString` also needs include `snapshotId` ########## fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java: ########## @@ -48,6 +54,277 @@ protected static void beforeAll() { execEnv.enableCheckpointing(1000); } + @Test + void testPosDeleteCompaction() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + try { + TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1"); + long t1Id = createPkTable(t1, true); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + List<InternalRow> rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 1, + 1 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 1); + + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 2, + 2 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 2); + + // add pos-delete + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 3, + 3 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + row( + true, + (byte) 100, + (short) 200, + 3, + 3 + 400L, + 500.1f, + 600.0d, + "v2", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + // one UPDATE_BEFORE and one UPDATE_AFTER + assertReplicaStatus(t1Bucket, 5); + checkFileStatusInIcebergTable(t1, 3, true); + + // trigger compaction + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 4, + 4 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 6); + checkFileStatusInIcebergTable(t1, 2, false); + } finally { + jobClient.cancel().get(); + } + } + + @Test + void testPosDeleteDuringCompaction() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + try { + TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2"); + long t1Id = createPkTable(t1, true); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + List<InternalRow> rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 1, + 1 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 1); + + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 2, + 2 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 2); + + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 3, + 3 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 3); + + // add pos-delete and trigger compaction + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 4, + 4 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + row( + true, + (byte) 100, + (short) 200, + 4, + 4 + 400L, + 500.1f, + 600.0d, + "v2", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 6); + // rewritten files should fail to commit due to conflict, add check here Review Comment: Seems in here, we don't do any check, right? I'd like suggest to: 1. Verify the records in iceberg to make sure tiering can still success although compaction conflict 2. write a again, but make sure the rows written won't bring any conflict. And check the compaction can happen successfully to make sure previous compaction conflicts won't prevent further compaction -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@fluss.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org