This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 9478259  feat: introduce time travel for data fusion (#195)
9478259 is described below

commit 9478259467002b9976f982ff2ad53aa7fec5aac9
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 3 15:21:29 2026 +0800

    feat: introduce time travel for data fusion (#195)
    
    * feat: introduce time travel for data fusion
---
 crates/integrations/datafusion/Cargo.toml          |   2 +
 crates/integrations/datafusion/src/lib.rs          |   2 +
 .../datafusion/src/relation_planner.rs             | 189 +++++++++++++++++++++
 .../integrations/datafusion/tests/read_tables.rs   |  63 ++++++-
 crates/paimon/src/spec/core_options.rs             |  16 ++
 crates/paimon/src/spec/schema.rs                   |   7 +
 crates/paimon/src/table/mod.rs                     |  11 ++
 crates/paimon/src/table/snapshot_manager.rs        | 175 +++++++++++++++----
 crates/paimon/src/table/table_scan.rs              |  30 +++-
 dev/spark/provision.py                             |  26 +++
 10 files changed, 483 insertions(+), 38 deletions(-)

diff --git a/crates/integrations/datafusion/Cargo.toml 
b/crates/integrations/datafusion/Cargo.toml
index c9673ec..e6ac2d3 100644
--- a/crates/integrations/datafusion/Cargo.toml
+++ b/crates/integrations/datafusion/Cargo.toml
@@ -26,9 +26,11 @@ keywords = ["paimon", "datafusion", "integrations"]
 
 [dependencies]
 async-trait = "0.1"
+chrono = "0.4"
 datafusion = { version = "52.3.0"}
 paimon = { path = "../../paimon" }
 futures = "0.3"
 
 [dev-dependencies]
+serde_json = "1"
 tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index 40639e2..cd73831 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -40,11 +40,13 @@ mod catalog;
 mod error;
 mod filter_pushdown;
 mod physical_plan;
+mod relation_planner;
 mod schema;
 mod table;
 
 pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider};
 pub use error::to_datafusion_error;
 pub use physical_plan::PaimonTableScan;
+pub use relation_planner::PaimonRelationPlanner;
 pub use schema::paimon_schema_to_arrow;
 pub use table::PaimonTableProvider;
diff --git a/crates/integrations/datafusion/src/relation_planner.rs 
b/crates/integrations/datafusion/src/relation_planner.rs
new file mode 100644
index 0000000..1425a5b
--- /dev/null
+++ b/crates/integrations/datafusion/src/relation_planner.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Custom [`RelationPlanner`] for Paimon time travel via `FOR SYSTEM_TIME AS 
OF`.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use datafusion::catalog::default_table_source::{provider_as_source, 
source_as_provider};
+use datafusion::common::TableReference;
+use datafusion::error::Result as DFResult;
+use datafusion::logical_expr::builder::LogicalPlanBuilder;
+use datafusion::logical_expr::planner::{
+    PlannedRelation, RelationPlanner, RelationPlannerContext, RelationPlanning,
+};
+use datafusion::sql::sqlparser::ast::{self, TableFactor, TableVersion};
+use paimon::spec::{SCAN_SNAPSHOT_ID_OPTION, SCAN_TIMESTAMP_MILLIS_OPTION};
+
+use crate::table::PaimonTableProvider;
+
+/// A [`RelationPlanner`] that intercepts `FOR SYSTEM_TIME AS OF` clauses
+/// on Paimon tables and resolves them to time travel options.
+///
+/// - Integer literal → sets `scan.snapshot-id` option on the table.
+/// - String literal → parsed as a timestamp, sets `scan.timestamp-millis` 
option.
+#[derive(Debug)]
+pub struct PaimonRelationPlanner;
+
+impl PaimonRelationPlanner {
+    pub fn new() -> Self {
+        Self
+    }
+}
+
+impl Default for PaimonRelationPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl RelationPlanner for PaimonRelationPlanner {
+    fn plan_relation(
+        &self,
+        relation: TableFactor,
+        context: &mut dyn RelationPlannerContext,
+    ) -> DFResult<RelationPlanning> {
+        // Only handle Table factors with a version clause.
+        let TableFactor::Table {
+            ref name,
+            ref version,
+            ..
+        } = relation
+        else {
+            return Ok(RelationPlanning::Original(relation));
+        };
+
+        let version_expr = match version {
+            Some(TableVersion::ForSystemTimeAsOf(expr)) => expr.clone(),
+            _ => return Ok(RelationPlanning::Original(relation)),
+        };
+
+        // Resolve the table reference.
+        let table_ref = object_name_to_table_reference(name, context)?;
+        let source = context
+            .context_provider()
+            .get_table_source(table_ref.clone())?;
+        let provider = source_as_provider(&source)?;
+
+        // Check if this is a Paimon table.
+        let Some(paimon_provider) = 
provider.as_any().downcast_ref::<PaimonTableProvider>() else {
+            return Ok(RelationPlanning::Original(relation));
+        };
+
+        let extra_options = resolve_time_travel_options(&version_expr)?;
+        let new_table = 
paimon_provider.table().copy_with_options(extra_options);
+        let new_provider = PaimonTableProvider::try_new(new_table)?;
+        let new_source = provider_as_source(Arc::new(new_provider));
+
+        // Destructure to get alias.
+        let TableFactor::Table { alias, .. } = relation else {
+            unreachable!()
+        };
+
+        let plan = LogicalPlanBuilder::scan(table_ref, new_source, 
None)?.build()?;
+        Ok(RelationPlanning::Planned(PlannedRelation::new(plan, alias)))
+    }
+}
+
+/// Convert a sqlparser `ObjectName` to a DataFusion `TableReference`.
+fn object_name_to_table_reference(
+    name: &ast::ObjectName,
+    context: &mut dyn RelationPlannerContext,
+) -> DFResult<TableReference> {
+    let idents: Vec<String> = name
+        .0
+        .iter()
+        .map(|part| {
+            let ident = part.as_ident().ok_or_else(|| {
+                datafusion::error::DataFusionError::Plan(format!(
+                    "Expected simple identifier in table reference, got: 
{part}"
+                ))
+            })?;
+            Ok(context.normalize_ident(ident.clone()))
+        })
+        .collect::<DFResult<_>>()?;
+    match idents.len() {
+        1 => Ok(TableReference::bare(idents[0].clone())),
+        2 => Ok(TableReference::partial(
+            idents[0].clone(),
+            idents[1].clone(),
+        )),
+        3 => Ok(TableReference::full(
+            idents[0].clone(),
+            idents[1].clone(),
+            idents[2].clone(),
+        )),
+        _ => Err(datafusion::error::DataFusionError::Plan(format!(
+            "Unsupported table reference: {name}"
+        ))),
+    }
+}
+
+/// Resolve `FOR SYSTEM_TIME AS OF <expr>` into table options.
+///
+/// - Integer literal → `{"scan.snapshot-id": "N"}`
+/// - String literal (timestamp) → parse to millis → 
`{"scan.timestamp-millis": "M"}`
+fn resolve_time_travel_options(expr: &ast::Expr) -> DFResult<HashMap<String, 
String>> {
+    match expr {
+        ast::Expr::Value(v) => match &v.value {
+            ast::Value::Number(n, _) => {
+                // Validate it's a valid integer
+                n.parse::<i64>().map_err(|e| {
+                    datafusion::error::DataFusionError::Plan(format!(
+                        "Invalid snapshot id '{n}': {e}"
+                    ))
+                })?;
+                Ok(HashMap::from([(
+                    SCAN_SNAPSHOT_ID_OPTION.to_string(),
+                    n.clone(),
+                )]))
+            }
+            ast::Value::SingleQuotedString(s) | 
ast::Value::DoubleQuotedString(s) => {
+                let timestamp_millis = parse_timestamp_to_millis(s)?;
+                Ok(HashMap::from([(
+                    SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
+                    timestamp_millis.to_string(),
+                )]))
+            }
+            _ => Err(datafusion::error::DataFusionError::Plan(format!(
+                "Unsupported time travel expression: {expr}"
+            ))),
+        },
+        _ => Err(datafusion::error::DataFusionError::Plan(format!(
+            "Unsupported time travel expression: {expr}. Expected an integer 
snapshot id or a timestamp string."
+        ))),
+    }
+}
+
+/// Parse a timestamp string to milliseconds since epoch (using local 
timezone).
+///
+/// Matches Java Paimon's behavior which uses `TimeZone.getDefault()`.
+fn parse_timestamp_to_millis(ts: &str) -> DFResult<i64> {
+    use chrono::{Local, NaiveDateTime, TimeZone};
+
+    let naive = NaiveDateTime::parse_from_str(ts, "%Y-%m-%d 
%H:%M:%S").map_err(|e| {
+        datafusion::error::DataFusionError::Plan(format!(
+            "Cannot parse time travel timestamp '{ts}': {e}. Expected format: 
YYYY-MM-DD HH:MM:SS"
+        ))
+    })?;
+    let local = Local.from_local_datetime(&naive).single().ok_or_else(|| {
+        datafusion::error::DataFusionError::Plan(format!("Ambiguous or invalid 
local time: '{ts}'"))
+    })?;
+    Ok(local.timestamp_millis())
+}
diff --git a/crates/integrations/datafusion/tests/read_tables.rs 
b/crates/integrations/datafusion/tests/read_tables.rs
index 89f0dd0..8f831d7 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -24,7 +24,7 @@ use datafusion::logical_expr::{col, lit, 
TableProviderFilterPushDown};
 use datafusion::prelude::{SessionConfig, SessionContext};
 use paimon::catalog::Identifier;
 use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
-use paimon_datafusion::{PaimonCatalogProvider, PaimonTableProvider};
+use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, 
PaimonTableProvider};
 
 fn get_test_warehouse() -> String {
     std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| 
"/tmp/paimon-warehouse".to_string())
@@ -363,3 +363,64 @@ async fn test_missing_database_returns_no_schema() {
         "missing databases should not resolve to a schema provider"
     );
 }
+
+// ======================= Time Travel Tests =======================
+
+/// Helper: create a SessionContext with catalog + relation planner for time 
travel.
+/// Uses BigQuery dialect to enable `FOR SYSTEM_TIME AS OF` syntax.
+async fn create_time_travel_context() -> SessionContext {
+    let catalog = create_catalog();
+    let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"BigQuery");
+    let ctx = SessionContext::new_with_config(config);
+    ctx.register_catalog(
+        "paimon",
+        Arc::new(PaimonCatalogProvider::new(Arc::new(catalog))),
+    );
+    ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
+        .expect("Failed to register relation planner");
+    ctx
+}
+
+#[tokio::test]
+async fn test_time_travel_by_snapshot_id() {
+    let ctx = create_time_travel_context().await;
+
+    // Snapshot 1: should contain only the first insert (alice, bob)
+    let batches = ctx
+        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 1")
+        .await
+        .expect("time travel query should parse")
+        .collect()
+        .await
+        .expect("time travel query should execute");
+
+    let mut rows = extract_id_name_rows(&batches);
+    rows.sort_by_key(|(id, _)| *id);
+    assert_eq!(
+        rows,
+        vec![(1, "alice".to_string()), (2, "bob".to_string())],
+        "Snapshot 1 should contain only the first batch of rows"
+    );
+
+    // Snapshot 2 (latest): should contain all rows
+    let batches = ctx
+        .sql("SELECT id, name FROM paimon.default.time_travel_table FOR 
SYSTEM_TIME AS OF 2")
+        .await
+        .expect("time travel query should parse")
+        .collect()
+        .await
+        .expect("time travel query should execute");
+
+    let mut rows = extract_id_name_rows(&batches);
+    rows.sort_by_key(|(id, _)| *id);
+    assert_eq!(
+        rows,
+        vec![
+            (1, "alice".to_string()),
+            (2, "bob".to_string()),
+            (3, "carol".to_string()),
+            (4, "dave".to_string()),
+        ],
+        "Snapshot 2 should contain all rows"
+    );
+}
diff --git a/crates/paimon/src/spec/core_options.rs 
b/crates/paimon/src/spec/core_options.rs
index 0f15922..aeae23c 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -23,6 +23,8 @@ const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = 
"source.split.target-size";
 const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost";
 const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name";
 const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name";
+pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id";
+pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis";
 const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024;
 const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024;
 const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__";
@@ -88,6 +90,20 @@ impl<'a> CoreOptions<'a> {
             .map(|v| v.eq_ignore_ascii_case("true"))
             .unwrap_or(true)
     }
+
+    /// Snapshot id for time travel via `scan.snapshot-id`.
+    pub fn scan_snapshot_id(&self) -> Option<i64> {
+        self.options
+            .get(SCAN_SNAPSHOT_ID_OPTION)
+            .and_then(|v| v.parse().ok())
+    }
+
+    /// Timestamp in millis for time travel via `scan.timestamp-millis`.
+    pub fn scan_timestamp_millis(&self) -> Option<i64> {
+        self.options
+            .get(SCAN_TIMESTAMP_MILLIS_OPTION)
+            .and_then(|v| v.parse().ok())
+    }
 }
 
 /// Parse a memory size string to bytes using binary (1024-based) semantics.
diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs
index 06cdd11..c1047ad 100644
--- a/crates/paimon/src/spec/schema.rs
+++ b/crates/paimon/src/spec/schema.rs
@@ -94,6 +94,13 @@ impl TableSchema {
         &self.options
     }
 
+    /// Create a copy of this schema with extra options merged in.
+    pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
+        let mut new_schema = self.clone();
+        new_schema.options.extend(extra);
+        new_schema
+    }
+
     pub fn comment(&self) -> Option<&str> {
         self.comment.as_deref()
     }
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 7c3f9a5..d936d7a 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -34,6 +34,7 @@ pub use table_scan::TableScan;
 use crate::catalog::Identifier;
 use crate::io::FileIO;
 use crate::spec::TableSchema;
+use std::collections::HashMap;
 
 /// Table represents a table in the catalog.
 #[derive(Debug, Clone)]
@@ -87,6 +88,16 @@ impl Table {
     pub fn new_read_builder(&self) -> ReadBuilder<'_> {
         ReadBuilder::new(self)
     }
+
+    /// Create a copy of this table with extra options merged into the schema.
+    pub fn copy_with_options(&self, extra: HashMap<String, String>) -> Self {
+        Self {
+            file_io: self.file_io.clone(),
+            identifier: self.identifier.clone(),
+            location: self.location.clone(),
+            schema: self.schema.copy_with_options(extra),
+        }
+    }
 }
 
 /// A stream of arrow [`RecordBatch`]es.
diff --git a/crates/paimon/src/table/snapshot_manager.rs 
b/crates/paimon/src/table/snapshot_manager.rs
index 4f979a5..6d8eab2 100644
--- a/crates/paimon/src/table/snapshot_manager.rs
+++ b/crates/paimon/src/table/snapshot_manager.rs
@@ -18,15 +18,14 @@
 //! Snapshot manager for reading snapshot metadata using FileIO.
 //!
 //! 
Reference:[org.apache.paimon.utils.SnapshotManager](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java).
-// TODO: remove when SnapshotManager is used (e.g. from Table or source 
planning).
-#![allow(dead_code)]
-
 use crate::io::FileIO;
 use crate::spec::Snapshot;
 use std::str;
 
 const SNAPSHOT_DIR: &str = "snapshot";
-const LATEST_SNAPSHOT_FILE: &str = "LATEST";
+const SNAPSHOT_PREFIX: &str = "snapshot-";
+const LATEST_HINT: &str = "LATEST";
+const EARLIEST_HINT: &str = "EARLIEST";
 
 /// Manager for snapshot files using unified FileIO.
 ///
@@ -51,9 +50,14 @@ impl SnapshotManager {
         format!("{}/{}", self.table_path, SNAPSHOT_DIR)
     }
 
-    /// Path to the LATEST file that stores the latest snapshot id.
-    pub fn latest_file_path(&self) -> String {
-        format!("{}/{}", self.snapshot_dir(), LATEST_SNAPSHOT_FILE)
+    /// Path to the LATEST hint file.
+    fn latest_hint_path(&self) -> String {
+        format!("{}/{}", self.snapshot_dir(), LATEST_HINT)
+    }
+
+    /// Path to the EARLIEST hint file.
+    fn earliest_hint_path(&self) -> String {
+        format!("{}/{}", self.snapshot_dir(), EARLIEST_HINT)
     }
 
     /// Path to the snapshot file for the given id (e.g. 
`snapshot/snapshot-1`).
@@ -61,36 +65,87 @@ impl SnapshotManager {
         format!("{}/snapshot-{}", self.snapshot_dir(), snapshot_id)
     }
 
-    /// Get the latest snapshot, or None if LATEST does not exist.
-    /// Returns an error if LATEST exists but the snapshot file 
(snapshot-{id}) does not exist.
-    pub async fn get_latest_snapshot(&self) -> crate::Result<Option<Snapshot>> 
{
-        // todo: consider snapshot loader to load snapshot from catalog
-        let latest_path = self.latest_file_path();
-        let input = self.file_io.new_input(&latest_path)?;
-        if !input.exists().await? {
-            // todo: may need to list directory and find the latest snapshot
-            return Ok(None);
+    /// Read a hint file and return the id, or None if the file does not exist,
+    /// is being deleted, or contains invalid content.
+    ///
+    /// Reference: 
[HintFileUtils.readHint](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
+    async fn read_hint(&self, path: &str) -> Option<i64> {
+        let input = self.file_io.new_input(path).ok()?;
+        // Try to read directly without exists() check to avoid TOCTOU race.
+        // The file may be deleted or overwritten concurrently.
+        let content = input.read().await.ok()?;
+        let id_str = str::from_utf8(&content).ok()?;
+        id_str.trim().parse().ok()
+    }
+
+    /// List snapshot files and find the id using the given reducer (min or 
max).
+    async fn find_by_list_files(&self, reducer: fn(i64, i64) -> i64) -> 
crate::Result<Option<i64>> {
+        let snapshot_dir = self.snapshot_dir();
+        let statuses = self.file_io.list_status(&snapshot_dir).await?;
+        let mut result: Option<i64> = None;
+        for status in statuses {
+            if status.is_dir {
+                continue;
+            }
+            let name = status.path.rsplit('/').next().unwrap_or(&status.path);
+            if let Some(id_str) = name.strip_prefix(SNAPSHOT_PREFIX) {
+                if let Ok(id) = id_str.parse::<i64>() {
+                    result = Some(match result {
+                        Some(r) => reducer(r, id),
+                        None => id,
+                    });
+                }
+            }
         }
-        let content = input.read().await?;
-        let id_str = str::from_utf8(&content).map_err(|e| 
crate::Error::DataInvalid {
-            message: "LATEST snapshot file invalid utf8".to_string(),
-            source: Some(Box::new(e)),
-        })?;
-        let snapshot_id: i64 = id_str
-            .trim()
-            .parse()
-            .map_err(|e| crate::Error::DataInvalid {
-                message: format!("LATEST snapshot id not a number: 
{id_str:?}"),
-                source: Some(Box::new(e)),
-            })?;
+        Ok(result)
+    }
+
+    /// Get the latest snapshot id.
+    ///
+    /// First tries the LATEST hint file. If the hint is valid and no next 
snapshot
+    /// exists, returns it. Otherwise falls back to listing snapshot files.
+    ///
+    /// Reference: 
[HintFileUtils.findLatest](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
+    pub async fn get_latest_snapshot_id(&self) -> crate::Result<Option<i64>> {
+        let hint_path = self.latest_hint_path();
+        if let Some(hint_id) = self.read_hint(&hint_path).await {
+            if hint_id > 0 {
+                let next_path = self.snapshot_path(hint_id + 1);
+                let next_input = self.file_io.new_input(&next_path)?;
+                if !next_input.exists().await? {
+                    return Ok(Some(hint_id));
+                }
+            }
+        }
+        self.find_by_list_files(i64::max).await
+    }
+
+    /// Get the earliest snapshot id.
+    ///
+    /// First tries the EARLIEST hint file. If the hint is valid and the 
snapshot
+    /// file exists, returns it. Otherwise falls back to listing snapshot 
files.
+    ///
+    /// Reference: 
[HintFileUtils.findEarliest](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java)
+    pub async fn earliest_snapshot_id(&self) -> crate::Result<Option<i64>> {
+        let hint_path = self.earliest_hint_path();
+        if let Some(hint_id) = self.read_hint(&hint_path).await {
+            let snap_path = self.snapshot_path(hint_id);
+            let snap_input = self.file_io.new_input(&snap_path)?;
+            if snap_input.exists().await? {
+                return Ok(Some(hint_id));
+            }
+        }
+        self.find_by_list_files(i64::min).await
+    }
+
+    /// Get a snapshot by id. Returns an error if the snapshot file does not 
exist.
+    pub async fn get_snapshot(&self, snapshot_id: i64) -> 
crate::Result<Snapshot> {
         let snapshot_path = self.snapshot_path(snapshot_id);
         let snap_input = self.file_io.new_input(&snapshot_path)?;
         if !snap_input.exists().await? {
             return Err(crate::Error::DataInvalid {
-                message: format!(
-                    "snapshot file does not exist: {snapshot_path} (LATEST 
points to snapshot id {snapshot_id})"
-                ),
-                source: None
+                message: format!("snapshot file does not exist: 
{snapshot_path}"),
+                source: None,
             });
         }
         let snap_bytes = snap_input.read().await?;
@@ -102,12 +157,66 @@ impl SnapshotManager {
         if snapshot.id() != snapshot_id {
             return Err(crate::Error::DataInvalid {
                 message: format!(
-                    "snapshot file id mismatch: LATEST points to 
{snapshot_id}, but file contains snapshot id {}",
+                    "snapshot file id mismatch: in file name is {snapshot_id}, 
but file contains snapshot id {}",
                     snapshot.id()
                 ),
                 source: None
             });
         }
+        Ok(snapshot)
+    }
+
+    /// Get the latest snapshot, or None if no snapshots exist.
+    pub async fn get_latest_snapshot(&self) -> crate::Result<Option<Snapshot>> 
{
+        let snapshot_id = match self.get_latest_snapshot_id().await? {
+            Some(id) => id,
+            None => return Ok(None),
+        };
+        let snapshot = self.get_snapshot(snapshot_id).await?;
         Ok(Some(snapshot))
     }
+
+    /// Returns the snapshot whose commit time is earlier than or equal to the 
given
+    /// `timestamp_millis`. If no such snapshot exists, returns None.
+    ///
+    /// Uses binary search over snapshot IDs (assumes monotonically increasing 
commit times).
+    ///
+    /// Reference: 
[SnapshotManager.earlierOrEqualTimeMills](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java)
+    pub async fn earlier_or_equal_time_mills(
+        &self,
+        timestamp_millis: i64,
+    ) -> crate::Result<Option<Snapshot>> {
+        let mut latest = match self.get_latest_snapshot_id().await? {
+            Some(id) => id,
+            None => return Ok(None),
+        };
+
+        let earliest_snapshot = match self.earliest_snapshot_id().await? {
+            Some(id) => self.get_snapshot(id).await?,
+            None => return Ok(None),
+        };
+
+        // If the earliest snapshot is already after the timestamp, no match.
+        if (earliest_snapshot.time_millis() as i64) > timestamp_millis {
+            return Ok(None);
+        }
+        let mut earliest = earliest_snapshot.id();
+
+        let mut result: Option<Snapshot> = None;
+        while earliest <= latest {
+            let mid = earliest + (latest - earliest) / 2;
+            let snapshot = self.get_snapshot(mid).await?;
+            let commit_time = snapshot.time_millis() as i64;
+            if commit_time > timestamp_millis {
+                latest = mid - 1;
+            } else if commit_time < timestamp_millis {
+                earliest = mid + 1;
+                result = Some(snapshot);
+            } else {
+                result = Some(snapshot);
+                break;
+            }
+        }
+        Ok(result)
+    }
 }
diff --git a/crates/paimon/src/table/table_scan.rs 
b/crates/paimon/src/table/table_scan.rs
index 3988a17..027dd75 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -240,15 +240,37 @@ impl<'a> TableScan<'a> {
         }
     }
 
-    /// Plan the full scan: read latest snapshot, manifest list, manifest 
entries, then build DataSplits using bin packing.
+    /// Plan the full scan: resolve snapshot (via options or latest), then 
read manifests and build DataSplits.
+    ///
+    /// Time travel is resolved from table options:
+    /// - `scan.snapshot-id` → read that specific snapshot
+    /// - `scan.timestamp-millis` → find the latest snapshot <= that timestamp
+    /// - otherwise → read the latest snapshot
+    ///
+    /// Reference: 
[TimeTravelUtil.tryTravelToSnapshot](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java)
     pub async fn plan(&self) -> crate::Result<Plan> {
         let file_io = self.table.file_io();
         let table_path = self.table.location();
         let snapshot_manager = SnapshotManager::new(file_io.clone(), 
table_path.to_string());
+        let core_options = CoreOptions::new(self.table.schema().options());
 
-        let snapshot = match snapshot_manager.get_latest_snapshot().await? {
-            Some(s) => s,
-            None => return Ok(Plan::new(Vec::new())),
+        let snapshot = if let Some(id) = core_options.scan_snapshot_id() {
+            snapshot_manager.get_snapshot(id).await?
+        } else if let Some(ts) = core_options.scan_timestamp_millis() {
+            match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
+                Some(s) => s,
+                None => {
+                    return Err(Error::DataInvalid {
+                        message: format!("No snapshot found with timestamp <= 
{ts}"),
+                        source: None,
+                    })
+                }
+            }
+        } else {
+            match snapshot_manager.get_latest_snapshot().await? {
+                Some(s) => s,
+                None => return Ok(Plan::new(Vec::new())),
+            }
         };
         self.plan_snapshot(snapshot).await
     }
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index cdc538a..baa6a04 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -260,6 +260,32 @@ def main():
     )
     spark.sql("DROP TABLE data_evolution_updates")
 
+    # ===== Time travel table: multiple snapshots for time travel tests =====
+    # Snapshot 1: rows (1, 'alice'), (2, 'bob')
+    # Snapshot 2: rows (1, 'alice'), (2, 'bob'), (3, 'carol'), (4, 'dave')
+    spark.sql(
+        """
+        CREATE TABLE IF NOT EXISTS time_travel_table (
+            id INT,
+            name STRING
+        ) USING paimon
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO time_travel_table VALUES
+            (1, 'alice'),
+            (2, 'bob')
+        """
+    )
+    spark.sql(
+        """
+        INSERT INTO time_travel_table VALUES
+            (3, 'carol'),
+            (4, 'dave')
+        """
+    )
+
 
 if __name__ == "__main__":
     main()

Reply via email to