Copilot commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2646896556


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -248,6 +285,58 @@ private void mayCreateLogScanner() {
         }
     }
 
+    private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
+            forceCompleteTieringLogRecords() throws IOException {
+        Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
+        Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+        // force finish all splits
+        Iterator<Map.Entry<TableBucket, TieringSplit>> 
currentTieringSplitsIterator =
+                currentTableSplitsByBucket.entrySet().iterator();
+        while (currentTieringSplitsIterator.hasNext()) {
+            Map.Entry<TableBucket, TieringSplit> entry = 
currentTieringSplitsIterator.next();
+            TableBucket bucket = entry.getKey();
+            TieringSplit split = entry.getValue();
+            if (split != null && split.isTieringLogSplit()) {
+                // get the current offset, timestamp that tiered so far
+                LogOffsetAndTimestamp logOffsetAndTimestamp =
+                        currentTableTieredOffsetAndTimestamp.get(bucket);
+                long logEndOffset =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOWN_BUCKET_OFFSET
+                                // logEngOffset is equal to offset tiered + 1
+                                : logOffsetAndTimestamp.logOffset + 1;
+                long timestamp =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOWN_BUCKET_TIMESTAMP
+                                : logOffsetAndTimestamp.timestamp;
+                TableBucketWriteResult<WriteResult> bucketWriteResult =
+                        completeLakeWriter(
+                                bucket, split.getPartitionName(), 
logEndOffset, timestamp);
+
+                if (logEndOffset == UNKNOWN_BUCKET_OFFSET) {
+                    // when the log end offset is unknow, the write result 
must be

Review Comment:
   Spelling error: "unknow" should be "unknown"
   ```suggestion
                       // when the log end offset is unknown, the write result 
must be
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -110,16 +121,38 @@ public class TieringSourceEnumerator
     public TieringSourceEnumerator(
             Configuration flussConf,
             SplitEnumeratorContext<TieringSplit> context,
-            long pollTieringTableIntervalMs) {
+            long pollTieringTableIntervalMs,
+            long tieringTableDurationMaxMs,
+            long tieringTableDurationDetectIntervalMs) {
+        this(
+                flussConf,
+                context,
+                pollTieringTableIntervalMs,
+                tieringTableDurationMaxMs,
+                tieringTableDurationDetectIntervalMs,
+                SystemClock.getInstance());
+    }
+
+    public TieringSourceEnumerator(
+            Configuration flussConf,
+            SplitEnumeratorContext<TieringSplit> context,
+            long pollTieringTableIntervalMs,
+            long tieringTableDurationMaxMs,
+            long tieringTableDurationDetectIntervalMs,
+            Clock clock) {
         this.flussConf = flussConf;
         this.context = context;
         this.enumeratorMetricGroup = context.metricGroup();
         this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
+        this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;
+        this.tieringTableDurationDetectIntervalMs = 
tieringTableDurationDetectIntervalMs;
         this.pendingSplits = new ArrayList<>();

Review Comment:
   The pendingSplits list is not thread-safe (ArrayList), but it's being 
accessed from multiple async callbacks (handleReachMaxTieringDurationTables at 
line 301 and generateTieringSplits at line 431) and from assignSplits which has 
synchronization only on readersAwaitingSplit. This could lead to 
ConcurrentModificationException or other race conditions. Consider using a 
thread-safe collection like CopyOnWriteArrayList or adding proper 
synchronization around all accesses to pendingSplits.
   ```suggestion
           this.pendingSplits = Collections.synchronizedList(new ArrayList<>());
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java:
##########
@@ -72,6 +75,14 @@ public final boolean isTieringLogSplit() {
         return getClass() == TieringLogSplit.class;
     }
 
+    public void forceIgnore() {
+        this.forceIgnore = true;
+    }
+
+    public boolean isForceIgnore() {
+        return forceIgnore;
+    }

Review Comment:
   The forceIgnore field is being added to the TieringSplit class, but it's not 
included in the equals and hashCode methods (which are not shown in this diff 
but exist in the class). This means two splits that differ only in their 
forceIgnore flag will be considered equal, which could lead to incorrect 
behavior in collections like HashSet or HashMap. Consider adding forceIgnore to 
both equals and hashCode methods.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -249,6 +337,7 @@ private void generateAndAssignSplits(
 
     private void assignSplits() {
         /* This method may be called from both addSplitsBack and 
handleSplitRequest, make it thread safe. */
+        // todo: do we need to add lock?

Review Comment:
   This TODO comment should either be resolved or removed. If locking is 
actually needed for thread safety, implement it; if not needed, remove the 
comment to avoid confusion.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -248,6 +285,58 @@ private void mayCreateLogScanner() {
         }
     }
 
+    private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
+            forceCompleteTieringLogRecords() throws IOException {
+        Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
+        Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+        // force finish all splits
+        Iterator<Map.Entry<TableBucket, TieringSplit>> 
currentTieringSplitsIterator =
+                currentTableSplitsByBucket.entrySet().iterator();
+        while (currentTieringSplitsIterator.hasNext()) {
+            Map.Entry<TableBucket, TieringSplit> entry = 
currentTieringSplitsIterator.next();
+            TableBucket bucket = entry.getKey();
+            TieringSplit split = entry.getValue();
+            if (split != null && split.isTieringLogSplit()) {
+                // get the current offset, timestamp that tiered so far
+                LogOffsetAndTimestamp logOffsetAndTimestamp =
+                        currentTableTieredOffsetAndTimestamp.get(bucket);
+                long logEndOffset =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOWN_BUCKET_OFFSET
+                                // logEngOffset is equal to offset tiered + 1

Review Comment:
   Spelling/grammar error: "logEngOffset" should be "logEndOffset"
   ```suggestion
                                   // logEndOffset is equal to offset tiered + 1
   ```



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.tiering.TestingLakeTieringFactory;
+import org.apache.fluss.flink.tiering.TestingWriteResult;
+import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
+import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
+import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TieringSourceReader}. */
+class TieringSourceReaderTest extends FlinkTestBase {
+
+    @Test
+    void testHandleTieringReachMaxDurationEvent() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", 
"test_tiering_reach_max_duration");
+        long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR);
+        Configuration conf = new 
Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+        conf.set(
+                ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
+                ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
+        try (Connection connection = ConnectionFactory.createConnection(conf)) 
{
+            FutureCompletingBlockingQueue<
+                            
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>
+                    elementsQueue = new FutureCompletingBlockingQueue<>(16);
+            TestingReaderContext readerContext = new TestingReaderContext();
+            try (TieringSourceReader<TestingWriteResult> reader =
+                    new TieringSourceReader<>(
+                            elementsQueue,
+                            readerContext,
+                            connection,
+                            new TestingLakeTieringFactory(),
+                            Duration.ofMillis(500))) {
+
+                reader.start();
+
+                // no data, add a split for the table,
+                // should be force be complete after reach max duration
+                TieringLogSplit split =
+                        new TieringLogSplit(
+                                tablePath, new TableBucket(tableId, 0), null, 
EARLIEST_OFFSET, 100);
+                reader.addSplits(Collections.singletonList(split));
+
+                // send TieringReachMaxDurationEvent
+                TieringReachMaxDurationEvent event = new 
TieringReachMaxDurationEvent(tableId);
+                reader.handleSourceEvents(event);
+
+                retry(
+                        Duration.ofMinutes(1),
+                        () -> {
+                            
TestingReaderOutput<TableBucketWriteResult<TestingWriteResult>> output =
+                                    new TestingReaderOutput<>();
+                            // should force to finish, the write result is null
+                            reader.pollNext(output);
+                            assertThat(output.getEmittedRecords()).hasSize(1);
+                            TableBucketWriteResult<TestingWriteResult> result =
+                                    output.getEmittedRecords().get(0);
+                            assertThat(result.writeResult()).isNull();
+                        });
+
+                // write some data
+                writeRows(
+                        connection,
+                        tablePath,
+                        Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, 
"v2")),
+                        true);
+                split =
+                        new TieringLogSplit(
+                                tablePath,
+                                new TableBucket(tableId, 2),
+                                null,
+                                EARLIEST_OFFSET,
+                                // use 100L as end offset, so that
+                                // tiering won't be finished if no tiering 
reach max duration logic
+                                100L);
+
+                reader.addSplits(Collections.singletonList(split));
+
+                // wait to run one round of tiering to do some tiering
+                FutureCompletingBlockingQueue<
+                                
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>
+                        blockingQueue = getElementsQueue(reader);
+                // wait blockingQueue is not empty to make sure we have one 
fetch
+                // in tiering source reader
+                waitUntil(
+                        () -> !blockingQueue.isEmpty(),
+                        Duration.ofSeconds(30),
+                        "Fail to wait element queue is not empty.");
+
+                // send TieringReachMaxDurationEvent
+                event = new TieringReachMaxDurationEvent(tableId);
+                reader.handleSourceEvents(event);
+
+                // make sure tiering will be finished, still maintain the 
result
+                // of previous tiering
+                retry(
+                        Duration.ofMinutes(1),
+                        () -> {
+                            
TestingReaderOutput<TableBucketWriteResult<TestingWriteResult>>
+                                    output1 = new TestingReaderOutput<>();
+
+                            // should force to finish, the write result isn't 
null
+                            reader.pollNext(output1);
+                            assertThat(output1.getEmittedRecords()).hasSize(1);
+                            TableBucketWriteResult<TestingWriteResult> result =
+                                    output1.getEmittedRecords().get(0);
+                            TestingWriteResult testingWriteResult = 
result.writeResult();
+                            assertThat(testingWriteResult).isNotNull();
+                            assertThat(result.logEndOffset()).isEqualTo(1);
+                        });
+
+                // test add split with force ignore
+                split =
+                        new TieringLogSplit(
+                                tablePath,
+                                new TableBucket(tableId, 1),
+                                null,
+                                EARLIEST_OFFSET,
+                                100L);
+                split.forceIgnore();
+                reader.addSplits(Collections.singletonList(split));
+                // should skip tiering for this split
+                retry(
+                        Duration.ofMinutes(1),
+                        () -> {
+                            
TestingReaderOutput<TableBucketWriteResult<TestingWriteResult>>
+                                    output1 = new TestingReaderOutput<>();
+                            // should force to finish, and the result is null
+                            reader.pollNext(output1);
+                            assertThat(output1.getEmittedRecords()).hasSize(1);
+                            TableBucketWriteResult<TestingWriteResult> result =
+                                    output1.getEmittedRecords().get(0);
+                            assertThat(result.writeResult()).isNull();
+                        });
+            }
+        }
+    }
+
+    /**
+     * Get the elementsQueue from TieringSourceReader using reflection.
+     *
+     * @param reader the TieringSourceReader instance
+     * @return the elementsQueue field value
+     */
+    @SuppressWarnings("unchecked")
+    private FutureCompletingBlockingQueue<
+                    
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>
+            getElementsQueue(TieringSourceReader<TestingWriteResult> reader) 
throws Exception {
+        Class<?> clazz = reader.getClass();
+        while (clazz != null) {
+            try {
+                Field elementsQueueField = 
clazz.getDeclaredField("elementsQueue");
+                elementsQueueField.setAccessible(true);
+                return (FutureCompletingBlockingQueue<
+                                
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>)
+                        elementsQueueField.get(reader);
+            } catch (NoSuchFieldException e) {
+                // Try parent class
+                clazz = clazz.getSuperclass();
+            }
+        }
+        throw new RuntimeException("No elementsQueue field found");
+    }

Review Comment:
   Using reflection to access private fields in tests can be brittle and may 
break if the implementation changes. Consider making the elementsQueue field 
accessible for testing purposes (e.g., using @VisibleForTesting annotation or 
providing a package-private getter) rather than relying on reflection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to