luoyuxia commented on code in PR #1658:
URL: https://github.com/apache/fluss/pull/1658#discussion_r2332115414


##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java:
##########
@@ -48,6 +54,277 @@ protected static void beforeAll() {
         execEnv.enableCheckpointing(1000);
     }
 
+    @Test
+    void testPosDeleteCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1");
+            long t1Id = createPkTable(t1, true);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+            List<InternalRow> rows =
+                    Arrays.asList(

Review Comment:
   nit
   ```suggestion
                       Collections.singletonList
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java:
##########
@@ -82,10 +82,15 @@ public IcebergRewriteDataFiles targetSizeInBytes(long 
targetSize) {
         return this;
     }
 
-    private List<CombinedScanTask> planRewriteFileGroups() throws IOException {
+    private List<CombinedScanTask> planRewriteFileGroups(Long snapshotId) 
throws IOException {

Review Comment:
   nit
   ```suggestion
       private List<CombinedScanTask> planRewriteFileGroups(long snapshotId) 
throws IOException {
   ```



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java:
##########
@@ -162,11 +162,20 @@ private Long commitRewrite(
             Map<String, String> snapshotProperties) {
         icebergTable.refresh();
         RewriteFiles rewriteFiles = icebergTable.newRewrite();
-        for (RewriteDataFileResult rewriteDataFileResult : 
rewriteDataFileResults) {
-            
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
-            
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
-        }
         try {
+            if (rewriteDataFileResults.stream()
+                            .map(RewriteDataFileResult::snapshotId)
+                            .distinct()
+                            .count()
+                    > 1) {
+                throw new IllegalArgumentException(
+                        "Rewrite data file results must have same snapshot 
id.");
+            }
+            
rewriteFiles.validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId());
+            for (RewriteDataFileResult rewriteDataFileResult : 
rewriteDataFileResults) {
+                
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
+                
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
+            }

Review Comment:
   Can you please help change 
   
https://github.com/apache/fluss/pull/1658/files#diff-fdc440a28ca4c3607311914ebc2c4d6f160654e8eff57e44624d60818a76860fR186
   to 
   ```
   `.map(ContentFile::location)`
   ```
   in this pr.



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java:
##########
@@ -48,6 +54,277 @@ protected static void beforeAll() {
         execEnv.enableCheckpointing(1000);
     }
 
+    @Test
+    void testPosDeleteCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1");
+            long t1Id = createPkTable(t1, true);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+            List<InternalRow> rows =

Review Comment:
   I just feel a little complex in this test. No need to cover all datatypes in 
this pr. Coud we just use two columns `int, string` to make the test simple?



##########
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java:
##########
@@ -44,6 +47,10 @@ public List<DataFile> addedDataFiles() {
         return addedDataFiles;
     }
 
+    public long snapshotId() {
+        return snapshotId;
+    }
+
     @Override
     public String toString() {
         return "RewriteDataFileResult{"

Review Comment:
   nit: `toString` also needs include `snapshotId`



##########
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java:
##########
@@ -48,6 +54,277 @@ protected static void beforeAll() {
         execEnv.enableCheckpointing(1000);
     }
 
+    @Test
+    void testPosDeleteCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1");
+            long t1Id = createPkTable(t1, true);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+            List<InternalRow> rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    1,
+                                    1 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 1);
+
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    2,
+                                    2 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 2);
+
+            // add pos-delete
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    3,
+                                    3 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}),
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    3,
+                                    3 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v2",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            // one UPDATE_BEFORE and one UPDATE_AFTER
+            assertReplicaStatus(t1Bucket, 5);
+            checkFileStatusInIcebergTable(t1, 3, true);
+
+            // trigger compaction
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    4,
+                                    4 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 6);
+            checkFileStatusInIcebergTable(t1, 2, false);
+        } finally {
+            jobClient.cancel().get();
+        }
+    }
+
+    @Test
+    void testPosDeleteDuringCompaction() throws Exception {
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2");
+            long t1Id = createPkTable(t1, true);
+            TableBucket t1Bucket = new TableBucket(t1Id, 0);
+
+            List<InternalRow> rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    1,
+                                    1 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 1);
+
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    2,
+                                    2 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 2);
+
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    3,
+                                    3 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 3);
+
+            // add pos-delete and trigger compaction
+            rows =
+                    Arrays.asList(
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    4,
+                                    4 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v1",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}),
+                            row(
+                                    true,
+                                    (byte) 100,
+                                    (short) 200,
+                                    4,
+                                    4 + 400L,
+                                    500.1f,
+                                    600.0d,
+                                    "v2",
+                                    Decimal.fromUnscaledLong(900, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L),
+                                    
TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                                    TimestampNtz.fromMillis(1698235273501L),
+                                    TimestampNtz.fromMillis(1698235273501L, 
8000),
+                                    new byte[] {5, 6, 7, 8},
+                                    TypeUtils.castFromString("2023-10-25", 
DataTypes.DATE()),
+                                    TypeUtils.castFromString("09:30:00.0", 
DataTypes.TIME()),
+                                    BinaryString.fromString("abc"),
+                                    new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 
10}));
+            writeRows(t1, rows, false);
+            assertReplicaStatus(t1Bucket, 6);
+            // rewritten files should fail to commit due to conflict, add 
check here

Review Comment:
   Seems in here, we don't do any check, right?
   I'd like suggest to:
   
   1. Verify the records in iceberg to make sure tiering can still success 
although compaction conflict
   2. write a again, but make sure the rows written won't bring any conflict. 
And check the compaction can happen successfully to make sure previous 
compaction conflicts won't prevent further compaction



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@fluss.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to