xx789633 commented on code in PR #1658: URL: https://github.com/apache/fluss/pull/1658#discussion_r2332244449
########## fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java: ########## @@ -48,6 +54,277 @@ protected static void beforeAll() { execEnv.enableCheckpointing(1000); } + @Test + void testPosDeleteCompaction() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + try { + TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1"); + long t1Id = createPkTable(t1, true); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + List<InternalRow> rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 1, + 1 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 1); + + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 2, + 2 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 2); + + // add pos-delete + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 3, + 3 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + row( + true, + (byte) 100, + (short) 200, + 3, + 3 + 400L, + 500.1f, + 600.0d, + "v2", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + // one UPDATE_BEFORE and one UPDATE_AFTER + assertReplicaStatus(t1Bucket, 5); + checkFileStatusInIcebergTable(t1, 3, true); + + // trigger compaction + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 4, + 4 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 6); + checkFileStatusInIcebergTable(t1, 2, false); + } finally { + jobClient.cancel().get(); + } + } + + @Test + void testPosDeleteDuringCompaction() throws Exception { + JobClient jobClient = buildTieringJob(execEnv); + try { + TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2"); + long t1Id = createPkTable(t1, true); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + List<InternalRow> rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 1, + 1 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 1); + + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 2, + 2 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 2); + + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 3, + 3 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 3); + + // add pos-delete and trigger compaction + rows = + Arrays.asList( + row( + true, + (byte) 100, + (short) 200, + 4, + 4 + 400L, + 500.1f, + 600.0d, + "v1", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + row( + true, + (byte) 100, + (short) 200, + 4, + 4 + 400L, + 500.1f, + 600.0d, + "v2", + Decimal.fromUnscaledLong(900, 5, 2), + Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0), + TimestampLtz.fromEpochMillis(1698235273400L), + TimestampLtz.fromEpochMillis(1698235273400L, 7000), + TimestampNtz.fromMillis(1698235273501L), + TimestampNtz.fromMillis(1698235273501L, 8000), + new byte[] {5, 6, 7, 8}, + TypeUtils.castFromString("2023-10-25", DataTypes.DATE()), + TypeUtils.castFromString("09:30:00.0", DataTypes.TIME()), + BinaryString.fromString("abc"), + new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10})); + writeRows(t1, rows, false); + assertReplicaStatus(t1Bucket, 6); + // rewritten files should fail to commit due to conflict, add check here Review Comment: Agree. I left a note here just to remind you to add more checks. Should we also check the latest iceberg snapshot version is correct? And the rewritten files have been deleted? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@fluss.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org