Copilot commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2694493537
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -134,7 +158,11 @@ public
RecordsWithSplitIds<TableBucketWriteResult<WriteResult>> fetch() throws I
}
} else {
if (currentLogScanner != null) {
- ScanRecords scanRecords = currentLogScanner.poll(POLL_TIMEOUT);
+ // force to complete records
+ if (reachTieringMaxDurationTables.contains(currentTableId)) {
+ return forceCompleteTieringLogRecords();
Review Comment:
There is a potential race condition when checking if a table has reached max
duration. The condition on line 162 checks if the currentTableId is in
reachTieringMaxDurationTables, but this set can be modified by the
TieringSourceFetcherManager from a different thread (via
handleTableReachTieringMaxDuration). Since reachTieringMaxDurationTables is not
thread-safe (it's a regular HashSet), this could lead to race conditions. The
set should be synchronized or use a concurrent collection.
```suggestion
synchronized (reachTieringMaxDurationTables) {
if
(reachTieringMaxDurationTables.contains(currentTableId)) {
return forceCompleteTieringLogRecords();
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java:
##########
@@ -138,8 +162,24 @@ public Builder<WriteResult>
withPollTieringTableIntervalMs(long pollTieringTable
return this;
}
Review Comment:
The builder method withTieringTableDurationMax on line 165 accepts a long
parameter representing milliseconds but the method name doesn't clearly
indicate the time unit. This could lead to confusion if a developer passes a
value in seconds instead of milliseconds. Consider renaming it to
withTieringTableDurationMaxMs to match the pattern used in
withPollTieringTableIntervalMs (line 160), or add JavaDoc to clarify the
expected unit.
```suggestion
/**
* Sets the maximum tiering table duration.
*
* @param tieringTableDurationMaxMs maximum duration in
<strong>milliseconds</strong>
* for tiering table data before it
is tiered.
* @return this builder.
*/
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.ExceptionUtils;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
+import static
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The IT case for tiering. */
+class TieringITCase extends FlinkTieringTestBase {
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkTieringTestBase.beforeAll();
+ }
+
+ @AfterAll
+ protected static void afterAll() throws Exception {
+ FlinkTieringTestBase.afterAll();
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ execEnv =
+ StreamExecutionEnvironment.getExecutionEnvironment()
+ .setParallelism(1)
+ .setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ }
+
+ @Test
+ void testTieringReachMaxDuration() throws Exception {
+ TablePath logTablePath = TablePath.of("fluss", "logtable");
+ createTable(logTablePath, false);
+ TablePath pkTablePath = TablePath.of("fluss", "pktable");
+ long pkTableId = createTable(pkTablePath, true);
+
+ // write some records to log table
+ List<InternalRow> rows = new ArrayList<>();
+ int recordCount = 6;
+ for (int i = 0; i < recordCount; i++) {
+ rows.add(GenericRow.of(i, BinaryString.fromString("v" + i)));
+ }
+ writeRows(logTablePath, rows, true);
+
+ rows = new ArrayList<>();
+ // write 6 records to primary key table, each bucket should only
contain few record
+ for (int i = 0; i < recordCount; i++) {
+ rows.add(GenericRow.of(i, BinaryString.fromString("v" + i)));
+ }
+ writeRows(pkTablePath, rows, false);
+
+ waitUntilSnapshot(pkTableId, 3, 0);
+
+ // set tiering duration to a small value for testing purpose
+ Configuration lakeTieringConfig = new Configuration();
+ lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_MAX,
Duration.ofSeconds(1));
+ lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL,
Duration.ofMillis(100));
+ JobClient jobClient = buildTieringJob(execEnv, lakeTieringConfig);
+
+ try {
+ // verify the tiered records is less than the table total record to
+ // make sure tiering is forced to complete when reach max duration
+ LakeSnapshot logTableLakeSnapshot = waitLakeSnapshot(logTablePath);
+ long tieredRecords = countTieredRecords(logTableLakeSnapshot);
+ assertThat(tieredRecords).isLessThan(recordCount);
+
+ // verify the tiered records is less than the table total record to
+ // make sure tiering is forced to complete when reach max duration
+ LakeSnapshot pkTableLakeSnapshot = waitLakeSnapshot(pkTablePath);
+ tieredRecords = countTieredRecords(pkTableLakeSnapshot);
+ assertThat(tieredRecords).isLessThan(recordCount);
Review Comment:
The test assertion on lines 108 and 114 checks that tieredRecords is less
than recordCount (6), which implicitly assumes that some but not all records
will be tiered. However, this assumption is fragile and timing-dependent.
Depending on the timing of when the max duration is reached, it's possible that
all records could be tiered (if they're written fast enough) or none could be
tiered (if the duration expires immediately). The test should either have more
robust assertions or add additional synchronization to ensure the intended
partial tiering behavior occurs.
```suggestion
// verify the tiered records do not exceed the table total record
// under the configured max duration
LakeSnapshot logTableLakeSnapshot =
waitLakeSnapshot(logTablePath);
long tieredRecords = countTieredRecords(logTableLakeSnapshot);
assertThat(tieredRecords).isLessThanOrEqualTo(recordCount);
// verify the tiered records do not exceed the table total record
// under the configured max duration
LakeSnapshot pkTableLakeSnapshot = waitLakeSnapshot(pkTablePath);
tieredRecords = countTieredRecords(pkTableLakeSnapshot);
assertThat(tieredRecords).isLessThanOrEqualTo(recordCount);
```
##########
fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java:
##########
@@ -96,4 +115,154 @@ public TableDescriptor getTable(TablePath tablePath) {
return tableByPath.get(tablePath);
}
}
+
+ private static class TestingPaimonTieringFactory
+ implements LakeTieringFactory<TestingPaimonWriteResult,
TestPaimonCommittable> {
+
+ @Override
+ public LakeWriter<TestingPaimonWriteResult> createLakeWriter(
+ WriterInitContext writerInitContext) {
+ return new TestingPaimonWriter(writerInitContext.tableInfo());
+ }
+
+ @Override
+ public SimpleVersionedSerializer<TestingPaimonWriteResult>
getWriteResultSerializer() {
+ return new TestingPaimonWriteResultSerializer();
+ }
+
+ @Override
+ public LakeCommitter<TestingPaimonWriteResult, TestPaimonCommittable>
createLakeCommitter(
+ CommitterInitContext committerInitContext) throws IOException {
+ return new TestingPaimonCommitter();
+ }
+
+ @Override
+ public SimpleVersionedSerializer<TestPaimonCommittable>
getCommittableSerializer() {
+ return new SimpleVersionedSerializer<TestPaimonCommittable>() {
+ @Override
+ public int getVersion() {
+ return 0;
+ }
+
+ @Override
+ public byte[] serialize(TestPaimonCommittable obj) throws
IOException {
+ return new byte[0];
+ }
+
+ @Override
+ public TestPaimonCommittable deserialize(int version, byte[]
serialized)
+ throws IOException {
+ return new TestPaimonCommittable();
+ }
+ };
+ }
+ }
+
+ private static class TestingPaimonWriter implements
LakeWriter<TestingPaimonWriteResult> {
+
+ static ConfigOption<Duration> writePauseOption =
+ key("write-pause").durationType().noDefaultValue();
+
+ private int writtenRecords = 0;
+ private final Duration writePause;
+
+ private TestingPaimonWriter(TableInfo tableInfo) {
+ this.writePause =
tableInfo.getCustomProperties().get(writePauseOption);
+ }
+
+ @Override
+ public void write(LogRecord record) throws IOException {
+ try {
+ if (writePause != null) {
+ Thread.sleep(writePause.toMillis());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while pausing before
write", e);
+ }
+ writtenRecords += 1;
+ }
+
+ @Override
+ public TestingPaimonWriteResult complete() throws IOException {
+ return new TestingPaimonWriteResult(writtenRecords);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ }
+
+ private static class TestingPaimonWriteResult {
+ private final int writtenRecords;
+
+ public TestingPaimonWriteResult(int writtenRecords) {
+ this.writtenRecords = writtenRecords;
+ }
Review Comment:
The TestingPaimonWriteResult class is missing a getter method for the
writtenRecords field. This makes the field inaccessible to other components
that might need to verify or use this data, which could be necessary for
testing purposes.
```suggestion
}
public int getWrittenRecords() {
return writtenRecords;
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source;
+
+import org.apache.fluss.flink.adapter.SingleThreadFetcherManagerAdapter;
+import org.apache.fluss.flink.tiering.source.split.TieringSplit;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
+import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The SplitFetcherManager for tiering source. This class is needed to help
notify a table reaches
+ * to deadline of tiering to {@link TieringSplitReader}.
+ */
+public class TieringSourceFetcherManager<WriteResult>
+ extends SingleThreadFetcherManagerAdapter<
+ TableBucketWriteResult<WriteResult>, TieringSplit> {
+
+ public TieringSourceFetcherManager(
+
FutureCompletingBlockingQueue<RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>>
+ elementsQueue,
+ Supplier<SplitReader<TableBucketWriteResult<WriteResult>,
TieringSplit>>
+ splitReaderSupplier,
+ Configuration configuration,
+ Consumer<Collection<String>> splitFinishedHook) {
+ super(elementsQueue, splitReaderSupplier, configuration,
splitFinishedHook);
+ }
+
+ public void markTableReachTieringMaxDuration(long tableId) {
+ if (!fetchers.isEmpty()) {
+ // The fetcher thread is still running. This should be the
majority of the cases.
+ fetchers.values()
+ .forEach(
+ splitFetcher ->
+
enqueueMarkTableReachTieringMaxDurationTask(
+ splitFetcher, tableId));
+ } else {
+ SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit>
splitFetcher =
+ createSplitFetcher();
+ enqueueMarkTableReachTieringMaxDurationTask(splitFetcher, tableId);
+ startFetcher(splitFetcher);
+ }
+ }
+
+ private void enqueueMarkTableReachTieringMaxDurationTask(
+ SplitFetcher<TableBucketWriteResult<WriteResult>, TieringSplit>
splitFetcher,
+ long reachTieringDeadlineTable) {
+ splitFetcher.enqueueTask(
+ new SplitFetcherTask() {
+ @Override
+ public boolean run() {
+ ((TieringSplitReader<WriteResult>)
splitFetcher.getSplitReader())
+
.handleTableReachTieringMaxDuration(reachTieringDeadlineTable);
Review Comment:
The cast on line 76 to TieringSplitReader is unsafe and could fail with a
ClassCastException if the split reader is of a different type. While this may
work in the current implementation, it would be better to either add a type
check before casting or ensure type safety through generic constraints.
Consider adding an interface that both types implement or add an instanceof
check with appropriate error handling.
```suggestion
SplitReader<TableBucketWriteResult<WriteResult>,
TieringSplit> splitReader =
splitFetcher.getSplitReader();
if (splitReader instanceof TieringSplitReader) {
@SuppressWarnings("unchecked")
TieringSplitReader<WriteResult>
tieringSplitReader =
(TieringSplitReader<WriteResult>)
splitReader;
tieringSplitReader.handleTableReachTieringMaxDuration(
reachTieringDeadlineTable);
} else {
throw new IllegalStateException(
"Expected SplitReader of type "
+
TieringSplitReader.class.getName()
+ " but found "
+ (splitReader == null
? "null"
:
splitReader.getClass().getName()));
}
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java:
##########
@@ -92,29 +97,38 @@ static void afterAll() throws Exception {
conn.close();
conn = null;
}
+
+ System.out.println("after all");
Review Comment:
There's a debug print statement (System.out.println) that should be removed.
This is likely leftover debugging code that doesn't belong in production or
test code.
```suggestion
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.ExceptionUtils;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
+import static
org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The IT case for tiering. */
+class TieringITCase extends FlinkTieringTestBase {
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkTieringTestBase.beforeAll();
+ }
+
+ @AfterAll
+ protected static void afterAll() throws Exception {
+ FlinkTieringTestBase.afterAll();
+ }
+
+ @BeforeEach
Review Comment:
This method overrides [FlinkTieringTestBase.beforeEach](1); it is advisable
to add an Override annotation.
```suggestion
@BeforeEach
@Override
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:
##########
@@ -248,6 +285,58 @@ private void mayCreateLogScanner() {
}
}
+ private RecordsWithSplitIds<TableBucketWriteResult<WriteResult>>
+ forceCompleteTieringLogRecords() throws IOException {
+ Map<TableBucket, TableBucketWriteResult<WriteResult>> writeResults =
new HashMap<>();
+ Map<TableBucket, String> finishedSplitIds = new HashMap<>();
+
+ // force finish all splits
+ Iterator<Map.Entry<TableBucket, TieringSplit>>
currentTieringSplitsIterator =
+ currentTableSplitsByBucket.entrySet().iterator();
+ while (currentTieringSplitsIterator.hasNext()) {
+ Map.Entry<TableBucket, TieringSplit> entry =
currentTieringSplitsIterator.next();
+ TableBucket bucket = entry.getKey();
+ TieringSplit split = entry.getValue();
+ if (split != null && split.isTieringLogSplit()) {
+ // get the current offset, timestamp that tiered so far
+ LogOffsetAndTimestamp logOffsetAndTimestamp =
+ currentTableTieredOffsetAndTimestamp.get(bucket);
+ long logEndOffset =
+ logOffsetAndTimestamp == null
+ ? UNKNOWN_BUCKET_OFFSET
+ // logEndOffset is equal to offset tiered + 1
+ : logOffsetAndTimestamp.logOffset + 1;
+ long timestamp =
+ logOffsetAndTimestamp == null
+ ? UNKNOWN_BUCKET_TIMESTAMP
+ : logOffsetAndTimestamp.timestamp;
+ TableBucketWriteResult<WriteResult> bucketWriteResult =
+ completeLakeWriter(
+ bucket, split.getPartitionName(),
logEndOffset, timestamp);
+
+ if (logEndOffset == UNKNOWN_BUCKET_OFFSET) {
+ // when the log end offset is unknown, the write result
must be
+ // null, otherwise, we should throw exception directly to
avoid data
+ // inconsistent
+ checkState(
+ bucketWriteResult.writeResult() == null,
+ "bucketWriteResult must be null when log end
offset is unknown when tiering "
+ + split);
+ }
+
+ writeResults.put(bucket, bucketWriteResult);
+ finishedSplitIds.put(bucket, split.splitId());
+ LOG.info(
+ "Split {} is forced to be finished due to tiering
reach max duration.",
+ split.splitId());
+ currentTieringSplitsIterator.remove();
+ }
+ }
+ reachTieringMaxDurationTables.remove(this.currentTableId);
+ mayFinishCurrentTable();
+ return new TableBucketWriteResultWithSplitIds(writeResults,
finishedSplitIds);
Review Comment:
The forceCompleteTieringLogRecords method only handles TieringLogSplit (line
300 checks split.isTieringLogSplit()). However, if there are
TieringSnapshotSplits in currentTableSplitsByBucket when max duration is
reached, they will be skipped silently and remain in the map. This could lead
to incomplete cleanup and potential resource leaks or hanging tiering
operations. The method should handle both log splits and snapshot splits, or
document why snapshot splits can be safely ignored during force completion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]