This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 6635792 fix(scan): Harden time-travel selector validation (#219)
6635792 is described below
commit 663579236e4ba38aeb34fb252cd22214a34bf20b
Author: Zach <[email protected]>
AuthorDate: Mon Apr 6 21:23:55 2026 +0800
fix(scan): Harden time-travel selector validation (#219)
---
crates/integration_tests/tests/read_tables.rs | 64 ++++++++
.../integrations/datafusion/tests/read_tables.rs | 81 +++++++++++
crates/paimon/src/spec/core_options.rs | 162 ++++++++++++++++++++-
crates/paimon/src/spec/mod.rs | 1 +
crates/paimon/src/table/table_scan.rs | 51 ++++---
5 files changed, 332 insertions(+), 27 deletions(-)
diff --git a/crates/integration_tests/tests/read_tables.rs
b/crates/integration_tests/tests/read_tables.rs
index 008b081..cbe14ee 100644
--- a/crates/integration_tests/tests/read_tables.rs
+++ b/crates/integration_tests/tests/read_tables.rs
@@ -1799,6 +1799,70 @@ async fn test_time_travel_by_tag_name() {
);
}
+#[tokio::test]
+async fn test_time_travel_conflicting_selectors_fail() {
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog, "time_travel_table").await;
+
+ let conflicted = table.copy_with_options(HashMap::from([
+ ("scan.tag-name".to_string(), "snapshot1".to_string()),
+ ("scan.snapshot-id".to_string(), "2".to_string()),
+ ]));
+
+ let plan_err = conflicted
+ .new_read_builder()
+ .new_scan()
+ .plan()
+ .await
+ .expect_err("conflicting time-travel selectors should fail");
+
+ match plan_err {
+ Error::DataInvalid { message, .. } => {
+ assert!(
+ message.contains("Only one time-travel selector may be set"),
+ "unexpected conflict error: {message}"
+ );
+ assert!(
+ message.contains("scan.snapshot-id"),
+ "conflict error should mention scan.snapshot-id: {message}"
+ );
+ assert!(
+ message.contains("scan.tag-name"),
+ "conflict error should mention scan.tag-name: {message}"
+ );
+ }
+ other => panic!("unexpected error: {other:?}"),
+ }
+}
+
+#[tokio::test]
+async fn test_time_travel_invalid_numeric_selector_fails() {
+ let catalog = create_file_system_catalog();
+ let table = get_table_from_catalog(&catalog, "time_travel_table").await;
+
+ let invalid = table.copy_with_options(HashMap::from([(
+ "scan.snapshot-id".to_string(),
+ "not-a-number".to_string(),
+ )]));
+
+ let plan_err = invalid
+ .new_read_builder()
+ .new_scan()
+ .plan()
+ .await
+ .expect_err("invalid numeric time-travel selector should fail");
+
+ match plan_err {
+ Error::DataInvalid { message, .. } => {
+ assert!(
+ message.contains("Invalid value for scan.snapshot-id"),
+ "unexpected invalid selector error: {message}"
+ );
+ }
+ other => panic!("unexpected error: {other:?}"),
+ }
+}
+
// ---------------------------------------------------------------------------
// Data evolution + drop column tests
// ---------------------------------------------------------------------------
diff --git a/crates/integrations/datafusion/tests/read_tables.rs
b/crates/integrations/datafusion/tests/read_tables.rs
index d438720..d3966ca 100644
--- a/crates/integrations/datafusion/tests/read_tables.rs
+++ b/crates/integrations/datafusion/tests/read_tables.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::sync::Arc;
use datafusion::arrow::array::{Array, Int32Array, StringArray};
@@ -57,6 +58,21 @@ async fn create_provider(table_name: &str) ->
PaimonTableProvider {
PaimonTableProvider::try_new(table).expect("Failed to create table
provider")
}
+async fn create_provider_with_options(
+ table_name: &str,
+ extra_options: HashMap<String, String>,
+) -> PaimonTableProvider {
+ let catalog = create_catalog();
+ let identifier = Identifier::new("default", table_name);
+ let table = catalog
+ .get_table(&identifier)
+ .await
+ .expect("Failed to get table")
+ .copy_with_options(extra_options);
+
+ PaimonTableProvider::try_new(table).expect("Failed to create table
provider")
+}
+
async fn read_rows(table_name: &str) -> Vec<(i32, String)> {
let batches = collect_query(table_name, &format!("SELECT id, name FROM
{table_name}"))
.await
@@ -469,6 +485,71 @@ async fn test_time_travel_by_tag_name() {
);
}
+#[tokio::test]
+async fn test_time_travel_conflicting_selectors_fail() {
+ let provider = create_provider_with_options(
+ "time_travel_table",
+ HashMap::from([("scan.tag-name".to_string(),
"snapshot1".to_string())]),
+ )
+ .await;
+
+ let config = SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"BigQuery");
+ let ctx = SessionContext::new_with_config(config);
+ ctx.register_table("time_travel_table", Arc::new(provider))
+ .expect("Failed to register table");
+ ctx.register_relation_planner(Arc::new(PaimonRelationPlanner::new()))
+ .expect("Failed to register relation planner");
+
+ let err = ctx
+ .sql("SELECT id, name FROM time_travel_table FOR SYSTEM_TIME AS OF 2")
+ .await
+ .expect("time travel query should parse")
+ .collect()
+ .await
+ .expect_err("conflicting time-travel selectors should fail");
+
+ let message = err.to_string();
+ assert!(
+ message.contains("Only one time-travel selector may be set"),
+ "unexpected conflict error: {message}"
+ );
+ assert!(
+ message.contains("scan.snapshot-id"),
+ "conflict error should mention scan.snapshot-id: {message}"
+ );
+ assert!(
+ message.contains("scan.tag-name"),
+ "conflict error should mention scan.tag-name: {message}"
+ );
+}
+
+#[tokio::test]
+async fn test_time_travel_invalid_numeric_selector_fails() {
+ let provider = create_provider_with_options(
+ "time_travel_table",
+ HashMap::from([("scan.snapshot-id".to_string(),
"not-a-number".to_string())]),
+ )
+ .await;
+
+ let ctx = SessionContext::new();
+ ctx.register_table("time_travel_table", Arc::new(provider))
+ .expect("Failed to register table");
+
+ let err = ctx
+ .sql("SELECT id, name FROM time_travel_table")
+ .await
+ .expect("query should parse")
+ .collect()
+ .await
+ .expect_err("invalid numeric time-travel selector should fail");
+
+ let message = err.to_string();
+ assert!(
+ message.contains("Invalid value for scan.snapshot-id"),
+ "unexpected invalid selector error: {message}"
+ );
+}
+
/// Verifies that data evolution merge correctly NULL-fills columns that no
file in a
/// merge group provides (e.g. a newly added column after MERGE INTO on old
rows).
/// Without the fix, `active_file_indices` would be empty and rows would be
silently lost.
diff --git a/crates/paimon/src/spec/core_options.rs
b/crates/paimon/src/spec/core_options.rs
index 94f35d6..1d2d6a4 100644
--- a/crates/paimon/src/spec/core_options.rs
+++ b/crates/paimon/src/spec/core_options.rs
@@ -40,6 +40,13 @@ pub struct CoreOptions<'a> {
options: &'a HashMap<String, String>,
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) enum TimeTravelSelector<'a> {
+ TagName(&'a str),
+ SnapshotId(i64),
+ TimestampMillis(i64),
+}
+
impl<'a> CoreOptions<'a> {
pub fn new(options: &'a HashMap<String, String>) -> Self {
Self { options }
@@ -94,25 +101,90 @@ impl<'a> CoreOptions<'a> {
.unwrap_or(true)
}
- /// Snapshot id for time travel via `scan.snapshot-id`.
+ fn parse_i64_option(&self, option_name: &'static str) ->
crate::Result<Option<i64>> {
+ match self.options.get(option_name) {
+ Some(value) => value
+ .parse::<i64>()
+ .map(Some)
+ .map_err(|e| crate::Error::DataInvalid {
+ message: format!("Invalid value for {option_name}:
'{value}'"),
+ source: Some(Box::new(e)),
+ }),
+ None => Ok(None),
+ }
+ }
+
+ /// Raw snapshot id accessor for `scan.snapshot-id`.
+ ///
+ /// This compatibility accessor is lossy: it returns `None` for absent or
+ /// invalid values and does not validate selector conflicts. Internal
+ /// time-travel planning should use `try_time_travel_selector`.
pub fn scan_snapshot_id(&self) -> Option<i64> {
self.options
.get(SCAN_SNAPSHOT_ID_OPTION)
.and_then(|v| v.parse().ok())
}
- /// Timestamp in millis for time travel via `scan.timestamp-millis`.
+ /// Raw timestamp accessor for `scan.timestamp-millis`.
+ ///
+ /// This compatibility accessor is lossy: it returns `None` for absent or
+ /// invalid values and does not validate selector conflicts. Internal
+ /// time-travel planning should use `try_time_travel_selector`.
pub fn scan_timestamp_millis(&self) -> Option<i64> {
self.options
.get(SCAN_TIMESTAMP_MILLIS_OPTION)
.and_then(|v| v.parse().ok())
}
- /// Tag name for time travel via `scan.tag-name`.
- pub fn scan_tag_name(&self) -> Option<&str> {
+ /// Raw tag name accessor for `scan.tag-name`.
+ ///
+ /// This compatibility accessor does not validate selector conflicts.
+ /// Internal time-travel planning should use `try_time_travel_selector`.
+ pub fn scan_tag_name(&self) -> Option<&'a str> {
self.options.get(SCAN_TAG_NAME_OPTION).map(String::as_str)
}
+ fn configured_time_travel_selectors(&self) -> Vec<&'static str> {
+ let mut selectors = Vec::with_capacity(3);
+ if self.options.contains_key(SCAN_TAG_NAME_OPTION) {
+ selectors.push(SCAN_TAG_NAME_OPTION);
+ }
+ if self.options.contains_key(SCAN_SNAPSHOT_ID_OPTION) {
+ selectors.push(SCAN_SNAPSHOT_ID_OPTION);
+ }
+ if self.options.contains_key(SCAN_TIMESTAMP_MILLIS_OPTION) {
+ selectors.push(SCAN_TIMESTAMP_MILLIS_OPTION);
+ }
+ selectors
+ }
+
+ /// Validates and normalizes the internal time-travel selector.
+ ///
+ /// This is the semantic owner for selector mutual exclusion and strict
+ /// numeric parsing.
+ pub(crate) fn try_time_travel_selector(&self) ->
crate::Result<Option<TimeTravelSelector<'a>>> {
+ let selectors = self.configured_time_travel_selectors();
+ if selectors.len() > 1 {
+ return Err(crate::Error::DataInvalid {
+ message: format!(
+ "Only one time-travel selector may be set, found: {}",
+ selectors.join(", ")
+ ),
+ source: None,
+ });
+ }
+
+ if let Some(tag_name) = self.scan_tag_name() {
+ Ok(Some(TimeTravelSelector::TagName(tag_name)))
+ } else if let Some(id) =
self.parse_i64_option(SCAN_SNAPSHOT_ID_OPTION)? {
+ Ok(Some(TimeTravelSelector::SnapshotId(id)))
+ } else if let Some(ts) =
self.parse_i64_option(SCAN_TIMESTAMP_MILLIS_OPTION)? {
+ Ok(Some(TimeTravelSelector::TimestampMillis(ts)))
+ } else {
+ Ok(None)
+ }
+ }
+
/// Explicit bucket key columns. If not set, defaults to primary keys for
PK tables.
pub fn bucket_key(&self) -> Option<Vec<String>> {
self.options
@@ -230,4 +302,86 @@ mod tests {
assert_eq!(core.partition_default_name(), "NULL_PART");
assert!(!core.legacy_partition_name());
}
+
+ #[test]
+ fn test_try_time_travel_selector_rejects_conflicting_selectors() {
+ let options = HashMap::from([
+ (SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string()),
+ (SCAN_SNAPSHOT_ID_OPTION.to_string(), "7".to_string()),
+ ]);
+ let core = CoreOptions::new(&options);
+
+ let err = core
+ .try_time_travel_selector()
+ .expect_err("conflicting selectors should fail");
+ match err {
+ crate::Error::DataInvalid { message, .. } => {
+ assert!(message.contains("Only one time-travel selector may be
set"));
+ assert!(message.contains(SCAN_TAG_NAME_OPTION));
+ assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
+ }
+ other => panic!("unexpected error: {other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_try_time_travel_selector_rejects_invalid_numeric_values() {
+ let snapshot_options =
+ HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(),
"abc".to_string())]);
+ let snapshot_core = CoreOptions::new(&snapshot_options);
+
+ let snapshot_err = snapshot_core
+ .try_time_travel_selector()
+ .expect_err("invalid snapshot id should fail");
+ match snapshot_err {
+ crate::Error::DataInvalid { message, .. } => {
+ assert!(message.contains(SCAN_SNAPSHOT_ID_OPTION));
+ }
+ other => panic!("unexpected error: {other:?}"),
+ }
+
+ let timestamp_options =
+ HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
"xyz".to_string())]);
+ let timestamp_core = CoreOptions::new(×tamp_options);
+
+ let timestamp_err = timestamp_core
+ .try_time_travel_selector()
+ .expect_err("invalid timestamp millis should fail");
+ match timestamp_err {
+ crate::Error::DataInvalid { message, .. } => {
+ assert!(message.contains(SCAN_TIMESTAMP_MILLIS_OPTION));
+ }
+ other => panic!("unexpected error: {other:?}"),
+ }
+ }
+
+ #[test]
+ fn test_try_time_travel_selector_normalizes_valid_selector() {
+ let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(),
"tag1".to_string())]);
+ let tag_core = CoreOptions::new(&tag_options);
+ assert_eq!(
+ tag_core.try_time_travel_selector().expect("tag selector"),
+ Some(TimeTravelSelector::TagName("tag1"))
+ );
+
+ let snapshot_options =
+ HashMap::from([(SCAN_SNAPSHOT_ID_OPTION.to_string(),
"7".to_string())]);
+ let snapshot_core = CoreOptions::new(&snapshot_options);
+ assert_eq!(
+ snapshot_core
+ .try_time_travel_selector()
+ .expect("snapshot selector"),
+ Some(TimeTravelSelector::SnapshotId(7))
+ );
+
+ let timestamp_options =
+ HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(),
"1234".to_string())]);
+ let timestamp_core = CoreOptions::new(×tamp_options);
+ assert_eq!(
+ timestamp_core
+ .try_time_travel_selector()
+ .expect("timestamp selector"),
+ Some(TimeTravelSelector::TimestampMillis(1234))
+ );
+ }
}
diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs
index 2cddd82..50402a3 100644
--- a/crates/paimon/src/spec/mod.rs
+++ b/crates/paimon/src/spec/mod.rs
@@ -26,6 +26,7 @@ mod data_file;
pub use data_file::*;
mod core_options;
+pub(crate) use core_options::TimeTravelSelector;
pub use core_options::*;
mod schema;
diff --git a/crates/paimon/src/table/table_scan.rs
b/crates/paimon/src/table/table_scan.rs
index da2dc24..f014b4b 100644
--- a/crates/paimon/src/table/table_scan.rs
+++ b/crates/paimon/src/table/table_scan.rs
@@ -31,7 +31,7 @@ use crate::io::FileIO;
use crate::predicate_stats::data_leaf_may_match;
use crate::spec::{
eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind,
IndexManifest,
- ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot,
+ ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot,
TimeTravelSelector,
};
use crate::table::bin_pack::split_for_batch;
use crate::table::source::{DataSplit, DataSplitBuilder, DeletionFile,
PartitionBucket, Plan};
@@ -326,47 +326,52 @@ impl<'a> TableScan<'a> {
/// Plan the full scan: resolve snapshot (via options or latest), then
read manifests and build DataSplits.
///
/// Time travel is resolved from table options:
+ /// - only one of `scan.tag-name`, `scan.snapshot-id`,
`scan.timestamp-millis` may be set
+ /// - `scan.tag-name` → read the snapshot pinned by that tag
/// - `scan.snapshot-id` → read that specific snapshot
/// - `scan.timestamp-millis` → find the latest snapshot <= that timestamp
/// - otherwise → read the latest snapshot
///
/// Reference:
[TimeTravelUtil.tryTravelToSnapshot](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java)
pub async fn plan(&self) -> crate::Result<Plan> {
+ let snapshot = match self.resolve_snapshot().await? {
+ Some(snapshot) => snapshot,
+ None => return Ok(Plan::new(Vec::new())),
+ };
+ self.plan_snapshot(snapshot).await
+ }
+
+ async fn resolve_snapshot(&self) -> crate::Result<Option<Snapshot>> {
let file_io = self.table.file_io();
let table_path = self.table.location();
let snapshot_manager = SnapshotManager::new(file_io.clone(),
table_path.to_string());
let core_options = CoreOptions::new(self.table.schema().options());
- let snapshot = if let Some(tag_name) = core_options.scan_tag_name() {
- let tag_manager = TagManager::new(file_io.clone(),
table_path.to_string());
- match tag_manager.get(tag_name).await? {
- Some(s) => s,
- None => {
- return Err(Error::DataInvalid {
+ match core_options.try_time_travel_selector()? {
+ Some(TimeTravelSelector::TagName(tag_name)) => {
+ let tag_manager = TagManager::new(file_io.clone(),
table_path.to_string());
+ match tag_manager.get(tag_name).await? {
+ Some(s) => Ok(Some(s)),
+ None => Err(Error::DataInvalid {
message: format!("Tag '{tag_name}' doesn't exist."),
source: None,
- })
+ }),
}
}
- } else if let Some(id) = core_options.scan_snapshot_id() {
- snapshot_manager.get_snapshot(id).await?
- } else if let Some(ts) = core_options.scan_timestamp_millis() {
- match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
- Some(s) => s,
- None => {
- return Err(Error::DataInvalid {
+ Some(TimeTravelSelector::SnapshotId(id)) => {
+ snapshot_manager.get_snapshot(id).await.map(Some)
+ }
+ Some(TimeTravelSelector::TimestampMillis(ts)) => {
+ match snapshot_manager.earlier_or_equal_time_mills(ts).await? {
+ Some(s) => Ok(Some(s)),
+ None => Err(Error::DataInvalid {
message: format!("No snapshot found with timestamp <=
{ts}"),
source: None,
- })
+ }),
}
}
- } else {
- match snapshot_manager.get_latest_snapshot().await? {
- Some(s) => s,
- None => return Ok(Plan::new(Vec::new())),
- }
- };
- self.plan_snapshot(snapshot).await
+ None => snapshot_manager.get_latest_snapshot().await,
+ }
}
/// Apply a limit-pushdown hint to the generated splits.