Copilot commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2694493537


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -134,7 +158,11 @@ public 
RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
             }
         } else {
             if (currentLogScanner != null) {
-                ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT);
+                // force to complete records
+                if (reachTieringMaxDurationTables.contains(currentTableId)) {
+                    return forceCompleteTieringLogRecords();

Review Comment:
   There is a potential race condition when checking if a table has reached max 
duration. The condition on line 162 checks if the currentTableId is in 
reachTieringMaxDurationTables, but this set can be modified by the 
TieringSourceFetcherManager from a different thread (via 
handleTableReachTieringMaxDuration). Since reachTieringMaxDurationTables is not 
thread-safe (it's a regular HashSet), this could lead to race conditions. The 
set should be synchronized or use a concurrent collection.
   ```suggestion
                   synchronized (reachTieringMaxDurationTables) {
                       if 
(reachTieringMaxDurationTables.contains(currentTableId)) {
                           return forceCompleteTieringLogRecords();
                       }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java:
##########
@@ -138,8 +162,24 @@ public Builder<WriteResult> 
withPollTieringTableIntervalMs(long pollTieringTable
             return this;
         }
 

Review Comment:
   The builder method withTieringTableDurationMax on line 165 accepts a long 
parameter representing milliseconds but the method name doesn't clearly 
indicate the time unit. This could lead to confusion if a developer passes a 
value in seconds instead of milliseconds. Consider renaming it to 
withTieringTableDurationMaxMs to match the pattern used in 
withPollTieringTableIntervalMs (line 160), or add JavaDoc to clarify the 
expected unit.
   ```suggestion
   
           /**
            * Sets the maximum tiering table duration.
            *
            * @param tieringTableDurationMaxMs maximum duration in 
<strong>milliseconds</strong>
            *                                  for tiering table data before it 
is tiered.
            * @return this builder.
            */
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.ExceptionUtils;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
+import static 
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The IT case for tiering. */
+class TieringITCase extends FlinkTieringTestBase {
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkTieringTestBase.beforeAll();
+    }
+
+    @AfterAll
+    protected static void afterAll() throws Exception {
+        FlinkTieringTestBase.afterAll();
+    }
+
+    @BeforeEach
+    void beforeEach() {
+        execEnv =
+                StreamExecutionEnvironment.getExecutionEnvironment()
+                        .setParallelism(1)
+                        .setRuntimeMode(RuntimeExecutionMode.STREAMING);
+    }
+
+    @Test
+    void testTieringReachMaxDuration() throws Exception {
+        TablePath logTablePath = TablePath.of("fluss", "logtable");
+        createTable(logTablePath, false);
+        TablePath pkTablePath = TablePath.of("fluss", "pktable");
+        long pkTableId = createTable(pkTablePath, true);
+
+        // write some records to log table
+        List<InternalRow> rows = new ArrayList<>();
+        int recordCount = 6;
+        for (int i = 0; i < recordCount; i++) {
+            rows.add(GenericRow.of(i, BinaryString.fromString("v" + i)));
+        }
+        writeRows(logTablePath, rows, true);
+
+        rows = new ArrayList<>();
+        //  write 6 records to primary key table, each bucket should only 
contain few record
+        for (int i = 0; i < recordCount; i++) {
+            rows.add(GenericRow.of(i, BinaryString.fromString("v" + i)));
+        }
+        writeRows(pkTablePath, rows, false);
+
+        waitUntilSnapshot(pkTableId, 3, 0);
+
+        // set tiering duration to a small value for testing purpose
+        Configuration lakeTieringConfig = new Configuration();
+        lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_MAX, 
Duration.ofSeconds(1));
+        lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL, 
Duration.ofMillis(100));
+        JobClient jobClient = buildTieringJob(execEnv, lakeTieringConfig);
+
+        try {
+            // verify the tiered records is less than the table total record to
+            // make sure tiering is forced to complete when reach max duration
+            LakeSnapshot logTableLakeSnapshot = waitLakeSnapshot(logTablePath);
+            long tieredRecords = countTieredRecords(logTableLakeSnapshot);
+            assertThat(tieredRecords).isLessThan(recordCount);
+
+            // verify the tiered records is less than the table total record to
+            // make sure tiering is forced to complete when reach max duration
+            LakeSnapshot pkTableLakeSnapshot = waitLakeSnapshot(pkTablePath);
+            tieredRecords = countTieredRecords(pkTableLakeSnapshot);
+            assertThat(tieredRecords).isLessThan(recordCount);

Review Comment:
   The test assertion on lines 108 and 114 checks that tieredRecords is less 
than recordCount (6), which implicitly assumes that some but not all records 
will be tiered. However, this assumption is fragile and timing-dependent. 
Depending on the timing of when the max duration is reached, it's possible that 
all records could be tiered (if they're written fast enough) or none could be 
tiered (if the duration expires immediately). The test should either have more 
robust assertions or add additional synchronization to ensure the intended 
partial tiering behavior occurs.
   ```suggestion
               // verify the tiered records do not exceed the table total record
               // under the configured max duration
               LakeSnapshot logTableLakeSnapshot = 
waitLakeSnapshot(logTablePath);
               long tieredRecords = countTieredRecords(logTableLakeSnapshot);
               assertThat(tieredRecords).isLessThanOrEqualTo(recordCount);
   
               // verify the tiered records do not exceed the table total record
               // under the configured max duration
               LakeSnapshot pkTableLakeSnapshot = waitLakeSnapshot(pkTablePath);
               tieredRecords = countTieredRecords(pkTableLakeSnapshot);
               assertThat(tieredRecords).isLessThanOrEqualTo(recordCount);
   ```



##########
fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java:
##########
@@ -96,4 +115,154 @@ public TableDescriptor getTable(TablePath tablePath) {
             return tableByPath.get(tablePath);
         }
     }
+
+    private static class TestingPaimonTieringFactory
+            implements LakeTieringFactory<TestingPaimonWriteResult, 
TestPaimonCommittable> {
+
+        @Override
+        public LakeWriter<TestingPaimonWriteResult> createLakeWriter(
+                WriterInitContext writerInitContext) {
+            return new TestingPaimonWriter(writerInitContext.tableInfo());
+        }
+
+        @Override
+        public SimpleVersionedSerializer<TestingPaimonWriteResult> 
getWriteResultSerializer() {
+            return new TestingPaimonWriteResultSerializer();
+        }
+
+        @Override
+        public LakeCommitter<TestingPaimonWriteResult, TestPaimonCommittable> 
createLakeCommitter(
+                CommitterInitContext committerInitContext) throws IOException {
+            return new TestingPaimonCommitter();
+        }
+
+        @Override
+        public SimpleVersionedSerializer<TestPaimonCommittable> 
getCommittableSerializer() {
+            return new SimpleVersionedSerializer<TestPaimonCommittable>() {
+                @Override
+                public int getVersion() {
+                    return 0;
+                }
+
+                @Override
+                public byte[] serialize(TestPaimonCommittable obj) throws 
IOException {
+                    return new byte[0];
+                }
+
+                @Override
+                public TestPaimonCommittable deserialize(int version, byte[] 
serialized)
+                        throws IOException {
+                    return new TestPaimonCommittable();
+                }
+            };
+        }
+    }
+
+    private static class TestingPaimonWriter implements 
LakeWriter<TestingPaimonWriteResult> {
+
+        static ConfigOption<Duration> writePauseOption =
+                key("write-pause").durationType().noDefaultValue();
+
+        private int writtenRecords = 0;
+        private final Duration writePause;
+
+        private TestingPaimonWriter(TableInfo tableInfo) {
+            this.writePause = 
tableInfo.getCustomProperties().get(writePauseOption);
+        }
+
+        @Override
+        public void write(LogRecord record) throws IOException {
+            try {
+                if (writePause != null) {
+                    Thread.sleep(writePause.toMillis());
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted while pausing before 
write", e);
+            }
+            writtenRecords += 1;
+        }
+
+        @Override
+        public TestingPaimonWriteResult complete() throws IOException {
+            return new TestingPaimonWriteResult(writtenRecords);
+        }
+
+        @Override
+        public void close() throws IOException {
+            // do nothing
+        }
+    }
+
+    private static class TestingPaimonWriteResult {
+        private final int writtenRecords;
+
+        public TestingPaimonWriteResult(int writtenRecords) {
+            this.writtenRecords = writtenRecords;
+        }

Review Comment:
   The TestingPaimonWriteResult class is missing a getter method for the 
writtenRecords field. This makes the field inaccessible to other components 
that might need to verify or use this data, which could be necessary for 
testing purposes.
   ```suggestion
           }
   
           public int getWrittenRecords() {
               return writtenRecords;
           }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source;
+
+import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
+import org.apache.fluss.flink.tiering.source.split.TieringSplit;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The SplitFetcherManager for tiering source. This class is needed to help 
notify a table reaches
+ * to deadline of tiering to {@link TieringSplitReader}.
+ */
+public class TieringSourceFetcherManager<WriteResult>
+        extends SingleThreadFetcherManagerAdapter<
+                TableBucketWriteResult<WriteResult>, TieringSplit> {
+
+    public TieringSourceFetcherManager(
+            
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
+                    elementsQueue,
+            Supplier<SplitReader<TableBucketWriteResult<WriteResult>, 
TieringSplit>>
+                    splitReaderSupplier,
+            Configuration configuration,
+            Consumer<Collection<String>> splitFinishedHook) {
+        super(elementsQueue, splitReaderSupplier, configuration, 
splitFinishedHook);
+    }
+
+    public void markTableReachTieringMaxDuration(long tableId) {
+        if (!fetchers.isEmpty()) {
+            // The fetcher thread is still running. This should be the 
majority of the cases.
+            fetchers.values()
+                    .forEach(
+                            splitFetcher ->
+                                    
enqueueMarkTableReachTieringMaxDurationTask(
+                                            splitFetcher, tableId));
+        } else {
+            SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> 
splitFetcher =
+                    createSplitFetcher();
+            enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
+            startFetcher(splitFetcher);
+        }
+    }
+
+    private void enqueueMarkTableReachTieringMaxDurationTask(
+            SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit> 
splitFetcher,
+            long reachTieringDeadlineTable) {
+        splitFetcher.enqueueTask(
+                new SplitFetcherTask() {
+                    @Override
+                    public boolean run() {
+                        ((TieringSplitReader<WriteResult>) 
splitFetcher.getSplitReader())
+                                
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);

Review Comment:
   The cast on line 76 to TieringSplitReader is unsafe and could fail with a 
ClassCastException if the split reader is of a different type. While this may 
work in the current implementation, it would be better to either add a type 
check before casting or ensure type safety through generic constraints. 
Consider adding an interface that both types implement or add an instanceof 
check with appropriate error handling.
   ```suggestion
                           SplitReader<TableBucketWriteResult<WriteResult>, 
TieringSplit> splitReader =
                                   splitFetcher.getSplitReader();
                           if (splitReader instanceof TieringSplitReader) {
                               @SuppressWarnings("unchecked")
                               TieringSplitReader<WriteResult> 
tieringSplitReader =
                                       (TieringSplitReader<WriteResult>) 
splitReader;
                               
tieringSplitReader.handleTableReachTieringMaxDuration(
                                       reachTieringDeadlineTable);
                           } else {
                               throw new IllegalStateException(
                                       "Expected SplitReader of type "
                                               + 
TieringSplitReader.class.getName()
                                               + " but found "
                                               + (splitReader == null
                                                       ? "null"
                                                       : 
splitReader.getClass().getName()));
                           }
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java:
##########
@@ -92,29 +97,38 @@ static void afterAll() throws Exception {
             conn.close();
             conn = null;
         }
+
+        System.out.println("after all");

Review Comment:
   There's a debug print statement (System.out.println) that should be removed. 
This is likely leftover debugging code that doesn't belong in production or 
test code.
   ```suggestion
   
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.ExceptionUtils;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
+import static 
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The IT case for tiering. */
+class TieringITCase extends FlinkTieringTestBase {
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkTieringTestBase.beforeAll();
+    }
+
+    @AfterAll
+    protected static void afterAll() throws Exception {
+        FlinkTieringTestBase.afterAll();
+    }
+
+    @BeforeEach

Review Comment:
   This method overrides [FlinkTieringTestBase.beforeEach](1); it is advisable 
to add an Override annotation.
   ```suggestion
       @BeforeEach
       @Override
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -248,6 +285,58 @@ private void mayCreateLogScanner() {
         }
     }
 
+    private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
+            forceCompleteTieringLogRecords() throws IOException {
+        Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
+        Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+        // force finish all splits
+        Iterator<Map.Entry<TableBucket, TieringSplit>> 
currentTieringSplitsIterator =
+                currentTableSplitsByBucket.entrySet().iterator();
+        while (currentTieringSplitsIterator.hasNext()) {
+            Map.Entry<TableBucket, TieringSplit> entry = 
currentTieringSplitsIterator.next();
+            TableBucket bucket = entry.getKey();
+            TieringSplit split = entry.getValue();
+            if (split != null && split.isTieringLogSplit()) {
+                // get the current offset, timestamp that tiered so far
+                LogOffsetAndTimestamp logOffsetAndTimestamp =
+                        currentTableTieredOffsetAndTimestamp.get(bucket);
+                long logEndOffset =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOWN_BUCKET_OFFSET
+                                // logEndOffset is equal to offset tiered + 1
+                                : logOffsetAndTimestamp.logOffset + 1;
+                long timestamp =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOWN_BUCKET_TIMESTAMP
+                                : logOffsetAndTimestamp.timestamp;
+                TableBucketWriteResult<WriteResult> bucketWriteResult =
+                        completeLakeWriter(
+                                bucket, split.getPartitionName(), 
logEndOffset, timestamp);
+
+                if (logEndOffset == UNKNOWN_BUCKET_OFFSET) {
+                    // when the log end offset is unknown, the write result 
must be
+                    // null, otherwise, we should throw exception directly to 
avoid data
+                    // inconsistent
+                    checkState(
+                            bucketWriteResult.writeResult() == null,
+                            "bucketWriteResult must be null when log end 
offset is unknown when tiering "
+                                    + split);
+                }
+
+                writeResults.put(bucket, bucketWriteResult);
+                finishedSplitIds.put(bucket, split.splitId());
+                LOG.info(
+                        "Split {} is forced to be finished due to tiering 
reach max duration.",
+                        split.splitId());
+                currentTieringSplitsIterator.remove();
+            }
+        }
+        reachTieringMaxDurationTables.remove(this.currentTableId);
+        mayFinishCurrentTable();
+        return new TableBucketWriteResultWithSplitIds(writeResults, 
finishedSplitIds);

Review Comment:
   The forceCompleteTieringLogRecords method only handles TieringLogSplit (line 
300 checks split.isTieringLogSplit()). However, if there are 
TieringSnapshotSplits in currentTableSplitsByBucket when max duration is 
reached, they will be skipped silently and remain in the map. This could lead 
to incomplete cleanup and potential resource leaks or hanging tiering 
operations. The method should handle both log splits and snapshot splits, or 
document why snapshot splits can be safely ignored during force completion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to