This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 939043e00 [flink] unify the behavior of different startup mode for 
lake table (#2572)
939043e00 is described below

commit 939043e00c8dd8f0387a355dbde207be988e0fe0
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Feb 6 14:16:23 2026 +0800

    [flink] unify the behavior of different startup mode for lake table (#2572)
---
 .../fluss/flink/source/FlinkTableSource.java       |  53 +-----
 .../flink/FlinkUnionReadFromTimestampITCase.java   | 194 ---------------------
 2 files changed, 4 insertions(+), 243 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index bbfbff402..8e234d86c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -35,14 +35,10 @@ import org.apache.fluss.metadata.ChangelogImage;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.predicate.GreaterOrEqual;
-import org.apache.fluss.predicate.LeafPredicate;
 import org.apache.fluss.predicate.PartitionPredicateVisitor;
 import org.apache.fluss.predicate.Predicate;
 import org.apache.fluss.predicate.PredicateBuilder;
 import org.apache.fluss.predicate.PredicateVisitor;
-import org.apache.fluss.row.TimestampLtz;
-import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
 
 import org.apache.flink.annotation.VisibleForTesting;
@@ -98,9 +94,7 @@ import static 
org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPred
 import static 
org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
 import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
 import static 
org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate;
-import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
-import static org.apache.fluss.utils.Preconditions.checkState;
 
 /** Flink table source to scan Fluss data. */
 public class FlinkTableSource
@@ -307,33 +301,23 @@ public class FlinkTableSource
             flussRowType = flussRowType.project(projectedFields);
         }
         OffsetsInitializer offsetsInitializer;
-        boolean enableLakeSource = lakeSource != null;
+        boolean enableLakeSource = false;
         switch (startupOptions.startupMode) {
             case EARLIEST:
                 offsetsInitializer = OffsetsInitializer.earliest();
                 break;
             case LATEST:
                 offsetsInitializer = OffsetsInitializer.latest();
-                // since it's scan from latest, don't consider lake data
-                enableLakeSource = false;
                 break;
             case FULL:
                 offsetsInitializer = OffsetsInitializer.full();
+                // when it's full mode and lake source is not null,
+                // enable lake source as the historical data
+                enableLakeSource = lakeSource != null;
                 break;
             case TIMESTAMP:
                 offsetsInitializer =
                         
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
-                if (hasPrimaryKey()) {
-                    // Currently, for primary key tables, we do not consider 
lake data
-                    // when reading from a given timestamp. This is because we 
will need
-                    // to read the change log of primary key table.
-                    // TODO: consider support it using paimon change log data?
-                    enableLakeSource = false;
-                } else {
-                    if (enableLakeSource) {
-                        enableLakeSource = 
pushTimeStampFilterToLakeSource(lakeSource);
-                    }
-                }
                 break;
             default:
                 throw new IllegalArgumentException(
@@ -384,35 +368,6 @@ public class FlinkTableSource
         }
     }
 
-    private boolean pushTimeStampFilterToLakeSource(LakeSource<?> lakeSource) {
-        // will push timestamp to lake
-        // we will have three additional system columns, __bucket, __offset, 
__timestamp
-        // in lake, get the  __timestamp index in lake table
-        final int timestampFieldIndex = tableOutputType.getFieldCount() + 2;
-        Predicate timestampFilter =
-                new LeafPredicate(
-                        GreaterOrEqual.INSTANCE,
-                        DataTypes.TIMESTAMP_LTZ(),
-                        timestampFieldIndex,
-                        TIMESTAMP_COLUMN_NAME,
-                        Collections.singletonList(
-                                
TimestampLtz.fromEpochMillis(startupOptions.startupTimestampMs)));
-        List<Predicate> acceptedPredicates =
-                lakeSource
-                        
.withFilters(Collections.singletonList(timestampFilter))
-                        .acceptedPredicates();
-        if (acceptedPredicates.isEmpty()) {
-            LOG.warn(
-                    "The lake source doesn't accept the filter {}, won't read 
data from lake.",
-                    timestampFilter);
-            return false;
-        }
-        checkState(
-                acceptedPredicates.size() == 1
-                        && acceptedPredicates.get(0).equals(timestampFilter));
-        return true;
-    }
-
     @Override
     public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
         LookupNormalizer lookupNormalizer =
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
deleted file mode 100644
index 9cab9c7d4..000000000
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadFromTimestampITCase.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.lake.paimon.flink;
-
-import org.apache.fluss.config.ConfigOptions;
-import org.apache.fluss.config.Configuration;
-import org.apache.fluss.config.MemorySize;
-import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
-import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.server.replica.Replica;
-import org.apache.fluss.server.testutils.FlussClusterExtension;
-import org.apache.fluss.utils.clock.ManualClock;
-
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil;
-import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
-import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
-import static org.apache.fluss.testutils.DataTestUtils.row;
-
-/** The ITCase for Flink union read from a timestamp. */
-class FlinkUnionReadFromTimestampITCase extends FlinkPaimonTieringTestBase {
-
-    private static final ManualClock CLOCK = new ManualClock();
-
-    @RegisterExtension
-    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
-            FlussClusterExtension.builder()
-                    .setClusterConf(initConfig())
-                    .setNumOfTabletServers(3)
-                    .setClock(CLOCK)
-                    .build();
-
-    private StreamTableEnvironment streamTEnv;
-
-    protected static Configuration initConfig() {
-        Configuration configuration = FlinkPaimonTieringTestBase.initConfig();
-        // set file size to 10b to make log segment roll frequently
-        configuration.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, 
MemorySize.parse("10b"));
-        configuration.set(ConfigOptions.REMOTE_LOG_TASK_INTERVAL_DURATION, 
Duration.ofMillis(100));
-        return configuration;
-    }
-
-    @BeforeAll
-    static void beforeAll() {
-        
FlinkPaimonTieringTestBase.beforeAll(FLUSS_CLUSTER_EXTENSION.getClientConfig());
-    }
-
-    @BeforeEach
-    public void beforeEach() {
-        super.beforeEach();
-        buildStreamTEnv();
-    }
-
-    @Override
-    protected FlussClusterExtension getFlussClusterExtension() {
-        return FLUSS_CLUSTER_EXTENSION;
-    }
-
-    @Test
-    void testUnionReadFromTimestamp() throws Exception {
-        // first of all, start tiering
-        JobClient jobClient = buildTieringJob(execEnv);
-        try {
-            String tableName = "logTable_read_timestamp";
-            TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
-            long tableId = createLogTable(tablePath, 1);
-            TableBucket t1Bucket = new TableBucket(tableId, 0);
-
-            List<Row> rows = new ArrayList<>();
-            for (int i = 0; i < 10; i++) {
-                rows.addAll(writeRows(tablePath, 3));
-                // each round advance 1s to make sure each round of writing has
-                // different timestamp
-                CLOCK.advanceTime(Duration.ofSeconds(1));
-            }
-            assertReplicaStatus(t1Bucket, rows.size());
-
-            Replica t1Replica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(t1Bucket);
-
-            // wait util only 2(default keep 2 segments in local) log segments 
in local
-            waitUtil(
-                    () -> t1Replica.getLogTablet().logSegments().size() == 2,
-                    Duration.ofMinutes(1),
-                    "Fail to wait util only 2 segments in local.");
-
-            // advance 10 days to mock remote log ttl
-            CLOCK.advanceTime(Duration.ofDays(10));
-            // wait util remote log ttl, should can't fetch from remote log 
for offset 10
-            waitUtil(
-                    () -> !t1Replica.getLogTablet().canFetchFromRemoteLog(10),
-                    Duration.ofMinutes(1),
-                    "Fail to wait log offset 10 ttl from remote log.");
-
-            // verify scan from timestamp 0, should read full data
-            assertRowResultsIgnoreOrder(
-                    streamTEnv
-                            .executeSql(
-                                    "select * from "
-                                            + tableName
-                                            + " /*+ 
OPTIONS('scan.startup.mode' = 'timestamp',\n"
-                                            + "'scan.startup.timestamp' = '0') 
*/")
-                            .collect(),
-                    rows,
-                    true);
-
-            // verify scan from timestamp 2000, shouldn't read the rows 
written in first two
-            // rounds,
-            CloseableIterator<Row> actualRows =
-                    streamTEnv
-                            .executeSql(
-                                    "select b from "
-                                            + tableName
-                                            + " /*+ 
OPTIONS('scan.startup.mode' = 'timestamp',\n"
-                                            + "'scan.startup.timestamp' = 
'2000') */")
-                            .collect();
-            List<Row> expectedRows =
-                    rows.stream()
-                            .skip(2 * 3)
-                            .map(row -> Row.of(row.getField(1)))
-                            .collect(Collectors.toList());
-            assertRowResultsIgnoreOrder(actualRows, expectedRows, true);
-
-            // verify scan from earliest
-            assertRowResultsIgnoreOrder(
-                    streamTEnv
-                            .executeSql(
-                                    "select * from "
-                                            + tableName
-                                            + " /*+ 
OPTIONS('scan.startup.mode' = 'earliest') */")
-                            .collect(),
-                    rows,
-                    true);
-
-        } finally {
-            jobClient.cancel();
-        }
-    }
-
-    private List<Row> writeRows(TablePath tablePath, int rows) throws 
Exception {
-        List<InternalRow> writtenRows = new ArrayList<>();
-        List<Row> flinkRow = new ArrayList<>();
-        for (int i = 0; i < rows; i++) {
-            writtenRows.add(row(i, "v" + i));
-            flinkRow.add(Row.of(i, "v" + i));
-        }
-        writeRows(tablePath, writtenRows, true);
-        return flinkRow;
-    }
-
-    private void buildStreamTEnv() {
-        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
-        // create table environment
-        streamTEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
-        // crate catalog using sql
-        streamTEnv.executeSql(
-                String.format(
-                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
-                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
-        streamTEnv.executeSql("use catalog " + CATALOG_NAME);
-        streamTEnv.executeSql("use " + DEFAULT_DB);
-    }
-}

Reply via email to