Copilot commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2711333093


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java:
##########
@@ -61,14 +64,17 @@ public class TieringSource<WriteResult>
     private final Configuration flussConf;
     private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
     private final long pollTieringTableIntervalMs;
+    private final long tieringTableDurationMaxMs;
 
     public TieringSource(
             Configuration flussConf,
             LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
-            long pollTieringTableIntervalMs) {
+            long pollTieringTableIntervalMs,
+            long tieringTableDurationMaxMs) {
         this.flussConf = flussConf;
         this.lakeTieringFactory = lakeTieringFactory;
         this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
+        this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;

Review Comment:
   Constructor parameter `tieringTableDurationMaxMs` has an inconsistent naming 
convention compared to the builder method `withTieringTableMaxDurationMs`. The 
constructor uses `tieringTableDurationMaxMs` while the builder uses 
`tieringTableMaxDurationMs` (note the different word ordering). This 
inconsistency can cause confusion. Standardize the naming to either 
`tieringTableDurationMaxMs` or `tieringTableMaxDurationMs` throughout.
   ```suggestion
               long tieringTableMaxDurationMs) {
           this.flussConf = flussConf;
           this.lakeTieringFactory = lakeTieringFactory;
           this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
           this.tieringTableDurationMaxMs = tieringTableMaxDurationMs;
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -92,38 +100,54 @@ public class TieringSplitReader<WriteResult>
     // map from table bucket to split id
     private final Map<TableBucket, TieringSplit> currentTableSplitsByBucket;
     private final Map<TableBucket, Long> currentTableStoppingOffsets;
-    private final Set<TieringLogSplit> currentTableEmptyLogSplits;
+
+    private final Map<TableBucket, LogOffsetAndTimestamp> 
currentTableTieredOffsetAndTimestamp;
+
+    private final Set<TieringSplit> currentEmptySplits;
 
     public TieringSplitReader(
             Connection connection, LakeTieringFactory<WriteResult, ?> 
lakeTieringFactory) {
+        this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
+    }
+
+    protected TieringSplitReader(
+            Connection connection,

Review Comment:
   The visibility change from private to protected and the addition of a 
constructor parameter reduces encapsulation. The `pollTimeout` field is only 
used for testing purposes but now becomes part of the production API surface. 
Consider keeping the production constructor as-is and using a test-specific 
subclass or factory method instead.
   ```suggestion
       /**
        * Public constructor used in production, relying on the default poll 
timeout.
        */
       public TieringSplitReader(
               Connection connection, LakeTieringFactory<WriteResult, ?> 
lakeTieringFactory) {
           this(connection, lakeTieringFactory, DEFAULT_POLL_TIMEOUT);
       }
   
       /**
        * Package-private factory intended for testing to allow custom poll 
timeout configuration.
        */
       static <W> TieringSplitReader<W> createForTesting(
               Connection connection,
               LakeTieringFactory<W, ?> lakeTieringFactory,
               Duration pollTimeout) {
           return new TieringSplitReader<>(connection, lakeTieringFactory, 
pollTimeout);
       }
   
       private TieringSplitReader(
               Connection connection,
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source;
+
+import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
+import org.apache.fluss.flink.tiering.source.split.TieringSplit;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The SplitFetcherManager for tiering source. This class is needed to help 
notify a table reaches
+ * to deadline of tiering to {@link TieringSplitReader}.

Review Comment:
   The comment says "reaches to deadline" but should be "reaches the deadline" 
(remove "to"). This is a grammatical error.
   ```suggestion
    * the deadline of tiering to {@link TieringSplitReader}.
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -248,6 +285,58 @@ private void mayCreateLogScanner() {
         }
     }
 
+    private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
+            forceCompleteTieringLogRecords() throws IOException {
+        Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
+        Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+        // force finish all splits
+        Iterator<Map.Entry<TableBucket, TieringSplit>> 
currentTieringSplitsIterator =
+                currentTableSplitsByBucket.entrySet().iterator();
+        while (currentTieringSplitsIterator.hasNext()) {
+            Map.Entry<TableBucket, TieringSplit> entry = 
currentTieringSplitsIterator.next();
+            TableBucket bucket = entry.getKey();
+            TieringSplit split = entry.getValue();
+            if (split != null && split.isTieringLogSplit()) {
+                // get the current offset, timestamp that tiered so far
+                LogOffsetAndTimestamp logOffsetAndTimestamp =
+                        currentTableTieredOffsetAndTimestamp.get(bucket);
+                long logEndOffset =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOWN_BUCKET_OFFSET
+                                // logEndOffset is equal to offset tiered + 1
+                                : logOffsetAndTimestamp.logOffset + 1;
+                long timestamp =
+                        logOffsetAndTimestamp == null
+                                ? UNKNOWN_BUCKET_TIMESTAMP
+                                : logOffsetAndTimestamp.timestamp;
+                TableBucketWriteResult<WriteResult> bucketWriteResult =
+                        completeLakeWriter(
+                                bucket, split.getPartitionName(), 
logEndOffset, timestamp);
+
+                if (logEndOffset == UNKNOWN_BUCKET_OFFSET) {
+                    // when the log end offset is unknown, the write result 
must be
+                    // null, otherwise, we should throw exception directly to 
avoid data
+                    // inconsistent
+                    checkState(
+                            bucketWriteResult.writeResult() == null,
+                            "bucketWriteResult must be null when log end 
offset is unknown when tiering "
+                                    + split);

Review Comment:
   The state check requires that `bucketWriteResult.writeResult()` must be null 
when `logEndOffset == UNKNOWN_BUCKET_OFFSET`, but this assumes the 
`completeLakeWriter` method always returns null when no data has been written. 
If there's any edge case where a writer returns a non-null result even when no 
offset has been recorded, this will throw an exception. Consider adding more 
defensive validation or clearer documentation on the contract of 
`completeLakeWriter`.
   ```suggestion
                       // When the log end offset is unknown, we expect the 
write result to be null.
                       // If it's not, log a warning instead of failing hard to 
avoid unexpected
                       // runtime exceptions due to edge cases in 
completeLakeWriter.
                       if (bucketWriteResult.writeResult() != null) {
                           LOG.warn(
                                   "bucketWriteResult has non-null writeResult 
while log end offset is "
                                           + "unknown when tiering {}",
                                   split);
                       }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -339,30 +438,29 @@ private TableBucketWriteResult<WriteResult> 
completeLakeWriter(
                 checkNotNull(currentTableNumberOfSplits));
     }
 
-    private TableBucketWriteResultWithSplitIds 
forEmptySplits(Set<TieringLogSplit> emptySplits) {
+    private TableBucketWriteResultWithSplitIds 
forEmptySplits(Set<TieringSplit> emptySplits) {
         Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults = 
new HashMap<>();
         Map<TableBucket, String> finishedSplitIds = new HashMap<>();
-        for (TieringLogSplit logSplit : emptySplits) {
-            TableBucket tableBucket = logSplit.getTableBucket();
-            finishedSplitIds.put(tableBucket, logSplit.splitId());
+        for (TieringSplit tieringSplit : emptySplits) {
+            TableBucket tableBucket = tieringSplit.getTableBucket();
+            finishedSplitIds.put(tableBucket, tieringSplit.splitId());
             writeResults.put(
                     tableBucket,
                     toTableBucketWriteResult(
-                            logSplit.getTablePath(),
+                            tieringSplit.getTablePath(),
                             tableBucket,
-                            logSplit.getPartitionName(),
+                            tieringSplit.getPartitionName(),
                             null,
-                            logSplit.getStoppingOffset(),
+                            UNKNOWN_BUCKET_OFFSET,

Review Comment:
   The method returns an offset of UNKNOWN_BUCKET_OFFSET for empty splits, but 
in the normal log split flow (line 451-452), it uses 
`logSplit.getStoppingOffset()` instead. This inconsistency could cause 
confusion. For consistency and correctness, empty splits should also use 
UNKNOWN_BUCKET_OFFSET, which they now do - but the comment on line 451-452 
should clarify why this is different from the regular stopping offset used 
before this change.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -298,24 +342,21 @@ private void generateAndAssignSplits(
     }
 
     private void assignSplits() {
-        // we don't assign splits during failovering
+        // we don't assign splits during failover
         if (isFailOvering) {
             return;
         }
-        /* This method may be called from both addSplitsBack and 
handleSplitRequest, make it thread safe. */
-        synchronized (readersAwaitingSplit) {
-            if (!readersAwaitingSplit.isEmpty()) {
-                final Integer[] readers = readersAwaitingSplit.toArray(new 
Integer[0]);
-                for (Integer nextAwaitingReader : readers) {
-                    if 
(!context.registeredReaders().containsKey(nextAwaitingReader)) {
-                        readersAwaitingSplit.remove(nextAwaitingReader);
-                        continue;
-                    }
-                    if (!pendingSplits.isEmpty()) {
-                        TieringSplit tieringSplit = pendingSplits.remove(0);
-                        context.assignSplit(tieringSplit, nextAwaitingReader);
-                        readersAwaitingSplit.remove(nextAwaitingReader);
-                    }
+        if (!readersAwaitingSplit.isEmpty()) {
+            final Integer[] readers = readersAwaitingSplit.toArray(new 
Integer[0]);
+            for (Integer nextAwaitingReader : readers) {
+                if 
(!context.registeredReaders().containsKey(nextAwaitingReader)) {
+                    readersAwaitingSplit.remove(nextAwaitingReader);
+                    continue;
+                }
+                if (!pendingSplits.isEmpty()) {
+                    TieringSplit tieringSplit = pendingSplits.remove(0);
+                    context.assignSplit(tieringSplit, nextAwaitingReader);
+                    readersAwaitingSplit.remove(nextAwaitingReader);
                 }

Review Comment:
   The removed synchronization on `readersAwaitingSplit` (previously on line 
348) could lead to race conditions. While the collections are now synchronized 
(lines 132-133), the compound operations (checking if empty, iterating, 
removing) are not atomic. Multiple threads could attempt to assign splits 
concurrently, potentially causing issues. The synchronization should be 
restored, or the logic should be restructured to ensure thread safety of the 
entire assignment operation.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -113,13 +121,18 @@ public class TieringSourceEnumerator
     public TieringSourceEnumerator(
             Configuration flussConf,
             SplitEnumeratorContext<TieringSplit> context,
-            long pollTieringTableIntervalMs) {
+            long pollTieringTableIntervalMs,
+            long tieringTableMaxDurationMs) {
         this.flussConf = flussConf;
         this.context = context;
+        this.timerService =

Review Comment:
   The `timerService` field is created but never properly shutdown in all error 
paths. If an exception occurs during `start()` or other operations before 
`close()` is called, the timer service thread will leak. Consider using 
try-with-resources or ensuring cleanup in a finally block, or handling shutdown 
more defensively in the close method.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -430,17 +533,29 @@ private void finishCurrentTable() throws IOException {
         } catch (Exception e) {
             throw new IOException("Fail to finish current table.", e);
         }
-
+        reachTieringMaxDurationTables.remove(currentTableId);
         // before switch to a new table, mark all as empty or null
         currentTableId = null;
         currentTablePath = null;
         currentTableNumberOfSplits = null;
         currentPendingSnapshotSplits.clear();
         currentTableStoppingOffsets.clear();
-        currentTableEmptyLogSplits.clear();
+        currentTableTieredOffsetAndTimestamp.clear();
         currentTableSplitsByBucket.clear();
     }
 
+    /**
+     * Handle a table reach max tiering duration. This will mark the current 
table as reaching max
+     * duration, and it will be force completed in the next fetch cycle.
+     */
+    public void handleTableReachTieringMaxDuration(long tableId) {
+        if ((currentTableId != null && currentTableId.equals(tableId))
+                || pendingTieringSplits.containsKey(tableId)) {

Review Comment:
   The method `handleTableReachTieringMaxDuration` adds the table to 
`reachTieringMaxDurationTables` even if the table is only in 
`pendingTieringSplits` and not yet the current table. However, the force 
completion logic in `forceCompleteTieringLogRecords()` (line 160) only checks 
if the current table is in the set. This means if a table reaches max duration 
before it becomes the current table, the flag will be set but never acted upon 
until that table becomes current. This could lead to memory leaks in the set or 
unexpected behavior. Consider only adding to the set when it's the current 
table, or handle pending tables differently.
   ```suggestion
           if (currentTableId != null && currentTableId.equals(tableId)) {
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -391,6 +435,15 @@ private void generateTieringSplits(Tuple3<Long, Long, 
TablePath> tieringTable)
             } else {
                 tieringTableEpochs.put(tieringTable.f0, tieringTable.f1);
                 pendingSplits.addAll(tieringSplits);
+
+                timerService.schedule(
+                        () ->
+                                context.runInCoordinatorThread(
+                                        () ->
+                                                
handleTableTieringReachMaxDuration(
+                                                        tieringTable.f0, 
tieringTable.f1)),

Review Comment:
   The timer scheduled on line 437-444 is not cancelled when a table tiering 
completes normally or fails. If a table finishes tiering before the max 
duration, the timer will still fire and execute the completion logic, 
potentially causing unexpected behavior or wasted resources. Consider tracking 
and cancelling these timers when the table tiering completes (in 
`handleSourceEvent` for `FinishedTieringEvent` and `FailedTieringEvent`).



##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.tiering.TestingLakeTieringFactory;
+import org.apache.fluss.flink.tiering.TestingWriteResult;
+import org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
+import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
+import org.apache.fluss.flink.utils.FlinkTestBase;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TieringSourceReader}. */
+class TieringSourceReaderTest extends FlinkTestBase {
+
+    @Test
+    void testHandleTieringReachMaxDurationEvent() throws Exception {
+        TablePath tablePath = TablePath.of("fluss", 
"test_tiering_reach_max_duration");
+        long tableId = createTable(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR);
+        Configuration conf = new 
Configuration(FLUSS_CLUSTER_EXTENSION.getClientConfig());
+        conf.set(
+                ConfigOptions.CLIENT_WRITER_BUCKET_NO_KEY_ASSIGNER,
+                ConfigOptions.NoKeyAssigner.ROUND_ROBIN);
+        try (Connection connection = ConnectionFactory.createConnection(conf)) 
{
+            FutureCompletingBlockingQueue<
+                            
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>
+                    elementsQueue = new FutureCompletingBlockingQueue<>(16);
+            TestingReaderContext readerContext = new TestingReaderContext();
+            try (TieringSourceReader<TestingWriteResult> reader =
+                    new TieringSourceReader<>(
+                            elementsQueue,
+                            readerContext,
+                            connection,
+                            new TestingLakeTieringFactory(),
+                            Duration.ofMillis(500))) {
+
+                reader.start();
+
+                // no data, add a split for the table,
+                // should be force be complete after reach max duration
+                TieringLogSplit split =
+                        new TieringLogSplit(
+                                tablePath, new TableBucket(tableId, 0), null, 
EARLIEST_OFFSET, 100);
+                reader.addSplits(Collections.singletonList(split));
+
+                // send TieringReachMaxDurationEvent
+                TieringReachMaxDurationEvent event = new 
TieringReachMaxDurationEvent(tableId);
+                reader.handleSourceEvents(event);
+
+                retry(
+                        Duration.ofMinutes(1),
+                        () -> {
+                            
TestingReaderOutput<TableBucketWriteResult<TestingWriteResult>> output =
+                                    new TestingReaderOutput<>();
+                            // should force to finish, the write result is null
+                            reader.pollNext(output);
+                            assertThat(output.getEmittedRecords()).hasSize(1);
+                            TableBucketWriteResult<TestingWriteResult> result =
+                                    output.getEmittedRecords().get(0);
+                            assertThat(result.writeResult()).isNull();
+                        });
+
+                // write some data
+                writeRows(
+                        connection,
+                        tablePath,
+                        Arrays.asList(row(0, "v0"), row(1, "v1"), row(2, 
"v2")),
+                        true);
+                split =
+                        new TieringLogSplit(
+                                tablePath,
+                                new TableBucket(tableId, 2),
+                                null,
+                                EARLIEST_OFFSET,
+                                // use 100L as end offset, so that
+                                // tiering won't be finished if no tiering 
reach max duration logic
+                                100L);
+
+                reader.addSplits(Collections.singletonList(split));
+
+                // wait to run one round of tiering to do some tiering
+                FutureCompletingBlockingQueue<
+                                
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>
+                        blockingQueue = getElementsQueue(reader);
+                // wait blockingQueue is not empty to make sure we have one 
fetch
+                // in tiering source reader
+                waitUntil(
+                        () -> !blockingQueue.isEmpty(),
+                        Duration.ofSeconds(30),
+                        "Fail to wait element queue is not empty.");
+
+                // send TieringReachMaxDurationEvent
+                event = new TieringReachMaxDurationEvent(tableId);
+                reader.handleSourceEvents(event);
+
+                // make sure tiering will be finished, still maintain the 
result
+                // of previous tiering
+                retry(
+                        Duration.ofMinutes(1),
+                        () -> {
+                            
TestingReaderOutput<TableBucketWriteResult<TestingWriteResult>>
+                                    output1 = new TestingReaderOutput<>();
+
+                            // should force to finish, the write result isn't 
null
+                            reader.pollNext(output1);
+                            assertThat(output1.getEmittedRecords()).hasSize(1);
+                            TableBucketWriteResult<TestingWriteResult> result =
+                                    output1.getEmittedRecords().get(0);
+                            TestingWriteResult testingWriteResult = 
result.writeResult();
+                            assertThat(testingWriteResult).isNotNull();
+                            assertThat(result.logEndOffset()).isEqualTo(1);
+                        });
+
+                // test add split with skipCurrentRound
+                split =
+                        new TieringLogSplit(
+                                tablePath,
+                                new TableBucket(tableId, 1),
+                                null,
+                                EARLIEST_OFFSET,
+                                100L);
+                split.skipCurrentRound();
+                reader.addSplits(Collections.singletonList(split));
+                // should skip tiering for this split
+                retry(
+                        Duration.ofMinutes(1),
+                        () -> {
+                            
TestingReaderOutput<TableBucketWriteResult<TestingWriteResult>>
+                                    output1 = new TestingReaderOutput<>();
+                            // should force to finish, and the result is null
+                            reader.pollNext(output1);
+                            assertThat(output1.getEmittedRecords()).hasSize(1);
+                            TableBucketWriteResult<TestingWriteResult> result =
+                                    output1.getEmittedRecords().get(0);
+                            assertThat(result.writeResult()).isNull();
+                        });
+            }
+        }
+    }
+
+    /**
+     * Get the elementsQueue from TieringSourceReader using reflection.
+     *
+     * @param reader the TieringSourceReader instance
+     * @return the elementsQueue field value
+     */
+    @SuppressWarnings("unchecked")
+    private FutureCompletingBlockingQueue<
+                    
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>
+            getElementsQueue(TieringSourceReader<TestingWriteResult> reader) 
throws Exception {
+        Class<?> clazz = reader.getClass();
+        while (clazz != null) {
+            try {
+                Field elementsQueueField = 
clazz.getDeclaredField("elementsQueue");
+                elementsQueueField.setAccessible(true);
+                return (FutureCompletingBlockingQueue<
+                                
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>)
+                        elementsQueueField.get(reader);
+            } catch (NoSuchFieldException e) {
+                // Try parent class
+                clazz = clazz.getSuperclass();
+            }
+        }
+        throw new RuntimeException("No elementsQueue field found");

Review Comment:
   Using reflection to access private fields from parent classes is fragile and 
breaks encapsulation. If the parent class structure changes or the field is 
renamed, this will fail at runtime. Consider adding a protected getter in the 
parent class or restructuring to avoid the need for reflection.
   ```suggestion
           Field elementsQueueField = 
TieringSourceReader.class.getDeclaredField("elementsQueue");
           elementsQueueField.setAccessible(true);
           return (FutureCompletingBlockingQueue<
                           
RecordsWithSplitIds<TableBucketWriteResult<TestingWriteResult>>>)
                   elementsQueueField.get(reader);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to