xx789633 commented on code in PR #1658:
URL: https://github.com/apache/fluss/pull/1658#discussion_r2332244449


##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java:
##########
@@ -48,6 +54,277 @@ protected static void beforeAll() {
         execEnv.enableCheckpointing(1000);
     }
 
+    @Test
+    void testPosDeleteCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1");
+            long t1Id = createPkTable(t1, true);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+            List<InternalRow> rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    1,
+                                    1 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 1);
+
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    2,
+                                    2 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 2);
+
+            // add pos-delete
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    3,
+                                    3 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}),
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    3,
+                                    3 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v2",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            // one UPDATE_BEFORE and one UPDATE_AFTER
+            assertReplicaStatus(t1Bucket, 5);
+            checkFileStatusInIcebergTable(t1, 3, true);
+
+            // trigger compaction
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    4,
+                                    4 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 6);
+            checkFileStatusInIcebergTable(t1, 2, false);
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
+    @Test
+    void testPosDeleteDuringCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2");
+            long t1Id = createPkTable(t1, true);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+            List<InternalRow> rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    1,
+                                    1 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 1);
+
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    2,
+                                    2 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 2);
+
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    3,
+                                    3 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 3);
+
+            // add pos-delete and trigger compaction
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    4,
+                                    4 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}),
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    4,
+                                    4 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v2",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 6);
+            // rewritten files should fail to commit due to conflict, add 
check here

Review Comment:
   Agree. I left a note here just to remind you to add more checks. Should we 
also check the rewritten files have been deleted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@fluss.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to