Copilot commented on code in PR #1725:
URL: https://github.com/apache/fluss/pull/1725#discussion_r2361994359


##########
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java:
##########
@@ -92,97 +92,124 @@ void testTiering() throws Exception {
         // then start tiering job
         JobClient jobClient = buildTieringJob(execEnv);
 
-        // check the status of replica after synced
-        assertReplicaStatus(t1Bucket, 3);
-        // check data in paimon
-        checkDataInPaimonPrimayKeyTable(t1, rows);
-        // check snapshot property in paimon
-        Map<String, String> properties =
-                new HashMap<String, String>() {
-                    {
-                        put(
-                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
-                                "[{\"bucket_id\":0,\"log_offset\":3}]");
-                    }
-                };
-        checkSnapshotPropertyInPaimon(t1, properties);
-
-        // then, create another log table
-        TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
-        long t2Id = createLogTable(t2);
-        TableBucket t2Bucket = new TableBucket(t2Id, 0);
-        List<InternalRow> flussRows = new ArrayList<>();
-        // write records
-        for (int i = 0; i < 10; i++) {
-            rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
-            flussRows.addAll(rows);
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(t1Bucket, 3);
+            // check data in paimon
+            checkDataInPaimonPrimayKeyTable(t1, rows);
+            // check snapshot property in paimon
+            Map<String, String> properties =
+                    new HashMap<String, String>() {
+                        {
+                            put(
+                                    FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                    "[{\"bucket_id\":0,\"log_offset\":3}]");
+                        }
+                    };
+            checkSnapshotPropertyInPaimon(t1, properties);
+
+            // then, create another log table
+            TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
+            long t2Id = createLogTable(t2);
+            TableBucket t2Bucket = new TableBucket(t2Id, 0);
+            List<InternalRow> flussRows = new ArrayList<>();
+            // write records
+            for (int i = 0; i < 10; i++) {
+                rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+                flussRows.addAll(rows);
+                // write records
+                writeRows(t2, rows, true);
+            }
+            // check the status of replica after synced;
+            // note: we can't update log start offset for unaware bucket mode 
log table
+            assertReplicaStatus(t2Bucket, 30);
+
+            // check data in paimon
+            checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+
+            // then write data to the pk tables
             // write records
-            writeRows(t2, rows, true);
+            rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, 
"v333"));
+            // write records
+            writeRows(t1, rows, false);
+
+            // check the status of replica of t2 after synced
+            // not check start offset since we won't
+            // update start log offset for primary key table
+            assertReplicaStatus(t1Bucket, 9);
+
+            checkDataInPaimonPrimayKeyTable(t1, rows);
+
+            // then create partitioned table and wait partitions are ready
+            TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, 
"partitionedTable");
+            Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
+                    createPartitionedTable(partitionedTablePath);
+            Map<Long, String> partitionNameByIds = 
waitUntilPartitions(partitionedTablePath);
+
+            // now, write rows into partitioned table
+            TableDescriptor partitionedTableDescriptor = 
tableIdAndDescriptor.f1;
+            Map<String, List<InternalRow>> writtenRowsByPartition =
+                    writeRowsIntoPartitionedTable(
+                            partitionedTablePath, partitionedTableDescriptor, 
partitionNameByIds);
+            long tableId = tableIdAndDescriptor.f0;
+
+            // wait until synced to paimon
+            for (Long partitionId : partitionNameByIds.keySet()) {
+                TableBucket tableBucket = new TableBucket(tableId, 
partitionId, 0);
+                assertReplicaStatus(tableBucket, 3);
+            }
+
+            // now, let's check data in paimon per partition
+            // check data in paimon
+            String partitionCol = 
partitionedTableDescriptor.getPartitionKeys().get(0);
+            for (String partitionName : partitionNameByIds.values()) {
+                checkDataInPaimonAppendOnlyPartitionedTable(
+                        partitionedTablePath,
+                        Collections.singletonMap(partitionCol, partitionName),
+                        writtenRowsByPartition.get(partitionName),
+                        0);
+            }
+
+            properties =
+                    new HashMap<String, String>() {
+                        {
+                            put(
+                                    FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                    "["
+                                            + 
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+                                            + 
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
+                                            + "]");
+                        }
+                    };
+            checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
+        } finally {
+            jobClient.cancel().get();
         }
-        // check the status of replica after synced;
-        // note: we can't update log start offset for unaware bucket mode log 
table
-        assertReplicaStatus(t2Bucket, 30);
-
-        // check data in paimon
-        checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);
+    }
 
-        // then write data to the pk tables
-        // write records
-        rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
+    @Test
+    void testTieringToDvEnabledTable() throws Exception {
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
+        long t1Id =
+                createPkTable(
+                        t1,
+                        
Collections.singletonMap("table.datalake.auto-compaction", "true"),
+                        
Collections.singletonMap("paimon.deletion-vectors.enabled", "true"));
         // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
         writeRows(t1, rows, false);
+        waitUntilSnapshot(t1Id, 1, 0);
 
-        // check the status of replica of t2 after synced
-        // not check start offset since we won't
-        // update start log offset for primary key table
-        assertReplicaStatus(t1Bucket, 9);
-
-        checkDataInPaimonPrimayKeyTable(t1, rows);
-
-        // then create partitioned table and wait partitions are ready
-        TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, 
"partitionedTable");
-        Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
-                createPartitionedTable(partitionedTablePath);
-        Map<Long, String> partitionNameByIds = 
waitUntilPartitions(partitionedTablePath);
-
-        // now, write rows into partitioned table
-        TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
-        Map<String, List<InternalRow>> writtenRowsByPartition =
-                writeRowsIntoPartitionedTable(
-                        partitionedTablePath, partitionedTableDescriptor, 
partitionNameByIds);
-        long tableId = tableIdAndDescriptor.f0;
-
-        // wait until synced to paimon
-        for (Long partitionId : partitionNameByIds.keySet()) {
-            TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
-            assertReplicaStatus(tableBucket, 3);
-        }
-
-        // now, let's check data in paimon per partition
-        // check data in paimon
-        String partitionCol = 
partitionedTableDescriptor.getPartitionKeys().get(0);
-        for (String partitionName : partitionNameByIds.values()) {
-            checkDataInPaimonAppendOnlyPartitionedTable(
-                    partitionedTablePath,
-                    Collections.singletonMap(partitionCol, partitionName),
-                    writtenRowsByPartition.get(partitionName),
-                    0);
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+        try {
+            // check the status of replica after synced
+            assertReplicaStatus(new TableBucket(t1Id, 0), 3);
+            // check data in paimon
+            checkDataInPaimonPrimayKeyTable(t1, rows);

Review Comment:
   Method name has a typo: 'PrimayKeyTable' should be 'PrimaryKeyTable'.
   ```suggestion
               checkDataInPaimonPrimaryKeyTable(t1, rows);
   ```



##########
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java:
##########
@@ -55,8 +60,20 @@ public MergeTreeWriter(
     }
 
     private static TableWriteImpl<KeyValue> createTableWrite(FileStoreTable 
fileStoreTable) {
+        // we allow users to configure the temporary directory used by fluss 
tiering
+        // since the default java.io.tmpdir may not suitable.

Review Comment:
   Grammar error: 'may not suitable' should be 'may not be suitable'.
   ```suggestion
           // since the default java.io.tmpdir may not be suitable.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to