This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 939043e00 [flink] unify the behavior of different startup mode for
lake table (#2572)
939043e00 is described below
commit 939043e00c8dd8f0387a355dbde207be988e0fe0
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Feb 6 14:16:23 2026 +0800
[flink] unify the behavior of different startup mode for lake table (#2572)
---
.../fluss/flink/source/FlinkTableSource.java | 53 +-----
.../flink/FlinkUnionReadFromTimestampITCase.java | 194 ---------------------
2 files changed, 4 insertions(+), 243 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index bbfbff402..8e234d86c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -35,14 +35,10 @@ import org.apache.fluss.metadata.ChangelogImage;
import org.apache.fluss.metadata.DeleteBehavior;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.predicate.GreaterOrEqual;
-import org.apache.fluss.predicate.LeafPredicate;
import org.apache.fluss.predicate.PartitionPredicateVisitor;
import org.apache.fluss.predicate.Predicate;
import org.apache.fluss.predicate.PredicateBuilder;
import org.apache.fluss.predicate.PredicateVisitor;
-import org.apache.fluss.row.TimestampLtz;
-import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
import org.apache.flink.annotation.VisibleForTesting;
@@ -98,9 +94,7 @@ import static
org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPred
import static
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
import static
org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate;
-import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.fluss.utils.Preconditions.checkNotNull;
-import static org.apache.fluss.utils.Preconditions.checkState;
/** Flink table source to scan Fluss data. */
public class FlinkTableSource
@@ -307,33 +301,23 @@ public class FlinkTableSource
flussRowType = flussRowType.project(projectedFields);
}
OffsetsInitializer offsetsInitializer;
- boolean enableLakeSource = lakeSource != null;
+ boolean enableLakeSource = false;
switch (startupOptions.startupMode) {
case EARLIEST:
offsetsInitializer = OffsetsInitializer.earliest();
break;
case LATEST:
offsetsInitializer = OffsetsInitializer.latest();
- // since it's scan from latest, don't consider lake data
- enableLakeSource = false;
break;
case FULL:
offsetsInitializer = OffsetsInitializer.full();
+ // when it's full mode and lake source is not null,
+ // enable lake source as the historical data
+ enableLakeSource = lakeSource != null;
break;
case TIMESTAMP:
offsetsInitializer =
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
- if (hasPrimaryKey()) {
- // Currently, for primary key tables, we do not consider
lake data
- // when reading from a given timestamp. This is because we
will need
- // to read the change log of primary key table.
- // TODO: consider support it using paimon change log data?
- enableLakeSource = false;
- } else {
- if (enableLakeSource) {
- enableLakeSource =
pushTimeStampFilterToLakeSource(lakeSource);
- }
- }
break;
default:
throw new IllegalArgumentException(
@@ -384,35 +368,6 @@ public class FlinkTableSource
}
}
- private boolean pushTimeStampFilterToLakeSource(LakeSource<?> lakeSource) {
- // will push timestamp to lake
- // we will have three additional system columns, __bucket, __offset,
__timestamp
- // in lake, get the __timestamp index in lake table
- final int timestampFieldIndex = tableOutputType.getFieldCount() + 2;
- Predicate timestampFilter =
- new LeafPredicate(
- GreaterOrEqual.INSTANCE,
- DataTypes.TIMESTAMP_LTZ(),
- timestampFieldIndex,
- TIMESTAMP_COLUMN_NAME,
- Collections.singletonList(
-
TimestampLtz.fromEpochMillis(startupOptions.startupTimestampMs)));
- List<Predicate> acceptedPredicates =
- lakeSource
-
.withFilters(Collections.singletonList(timestampFilter))
- .acceptedPredicates();
- if (acceptedPredicates.isEmpty()) {
- LOG.warn(
- "The lake source doesn't accept the filter {}, won't read
data from lake.",
- timestampFilter);
- return false;
- }
- checkState(
- acceptedPredicates.size() == 1
- && acceptedPredicates.get(0).equals(timestampFilter));
- return true;
- }
-
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
LookupNormalizer lookupNormalizer =
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
deleted file mode 100644
index 9cab9c7d4..000000000
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.paimon.flink;
-
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.MemorySize;
-import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
-import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.server.replica.Replica;
-import org.apache.fluss.server.testutils.FlussClusterExtension;
-import org.apache.fluss.utils.clock.ManualClock;
-
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
-import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
-import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
-import static org.apache.fluss.testutils.DataTestUtils.row;
-
-/** The ITCase for Flink union read from a timestamp. */
-class FlinkUnionReadFromTimestampITCase extends FlinkPaimonTieringTestBase {
-
- private static final ManualClock CLOCK = new ManualClock();
-
- @RegisterExtension
- public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
- FlussClusterExtension.builder()
- .setClusterConf(initConfig())
- .setNumOfTabletServers(3)
- .setClock(CLOCK)
- .build();
-
- private StreamTableEnvironment streamTEnv;
-
- protected static Configuration initConfig() {
- Configuration configuration = FlinkPaimonTieringTestBase.initConfig();
- // set file size to 10b to make log segment roll frequently
- configuration.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE,
MemorySize.parse("10b"));
- configuration.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION,
Duration.ofMillis(100));
- return configuration;
- }
-
- @BeforeAll
- static void beforeAll() {
-
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
- }
-
- @BeforeEach
- public void beforeEach() {
- super.beforeEach();
- buildStreamTEnv();
- }
-
- @Override
- protected FlussClusterExtension getFlussClusterExtension() {
- return FLUSS_CLUSTER_EXTENSION;
- }
-
- @Test
- void testUnionReadFromTimestamp() throws Exception {
- // first of all, start tiering
- JobClient jobClient = buildTieringJob(execEnv);
- try {
- String tableName = "logTable_read_timestamp";
- TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
- long tableId = createLogTable(tablePath, 1);
- TableBucket t1Bucket = new TableBucket(tableId, 0);
-
- List<Row> rows = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- rows.addAll(writeRows(tablePath, 3));
- // each round advance 1s to make sure each round of writing has
- // different timestamp
- CLOCK.advanceTime(Duration.ofSeconds(1));
- }
- assertReplicaStatus(t1Bucket, rows.size());
-
- Replica t1Replica =
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(t1Bucket);
-
- // wait util only 2(default keep 2 segments in local) log segments
in local
- waitUtil(
- () -> t1Replica.getLogTablet().logSegments().size() == 2,
- Duration.ofMinutes(1),
- "Fail to wait util only 2 segments in local.");
-
- // advance 10 days to mock remote log ttl
- CLOCK.advanceTime(Duration.ofDays(10));
- // wait util remote log ttl, should can't fetch from remote log
for offset 10
- waitUtil(
- () -> !t1Replica.getLogTablet().canFetchFromRemoteLog(10),
- Duration.ofMinutes(1),
- "Fail to wait log offset 10 ttl from remote log.");
-
- // verify scan from timestamp 0, should read full data
- assertRowResultsIgnoreOrder(
- streamTEnv
- .executeSql(
- "select * from "
- + tableName
- + " /*+
OPTIONS('scan.startup.mode' = 'timestamp',\n"
- + "'scan.startup.timestamp' = '0')
*/")
- .collect(),
- rows,
- true);
-
- // verify scan from timestamp 2000, shouldn't read the rows
written in first two
- // rounds,
- CloseableIterator<Row> actualRows =
- streamTEnv
- .executeSql(
- "select b from "
- + tableName
- + " /*+
OPTIONS('scan.startup.mode' = 'timestamp',\n"
- + "'scan.startup.timestamp' =
'2000') */")
- .collect();
- List<Row> expectedRows =
- rows.stream()
- .skip(2 * 3)
- .map(row -> Row.of(row.getField(1)))
- .collect(Collectors.toList());
- assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
-
- // verify scan from earliest
- assertRowResultsIgnoreOrder(
- streamTEnv
- .executeSql(
- "select * from "
- + tableName
- + " /*+
OPTIONS('scan.startup.mode' = 'earliest') */")
- .collect(),
- rows,
- true);
-
- } finally {
- jobClient.cancel();
- }
- }
-
- private List<Row> writeRows(TablePath tablePath, int rows) throws
Exception {
- List<InternalRow> writtenRows = new ArrayList<>();
- List<Row> flinkRow = new ArrayList<>();
- for (int i = 0; i < rows; i++) {
- writtenRows.add(row(i, "v" + i));
- flinkRow.add(Row.of(i, "v" + i));
- }
- writeRows(tablePath, writtenRows, true);
- return flinkRow;
- }
-
- private void buildStreamTEnv() {
- String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
- // create table environment
- streamTEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inStreamingMode());
- // crate catalog using sql
- streamTEnv.executeSql(
- String.format(
- "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
- CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
- streamTEnv.executeSql("use catalog " + CATALOG_NAME);
- streamTEnv.executeSql("use " + DEFAULT_DB);
- }
-}