jerry-024 commented on code in PR #309:
URL: https://github.com/apache/paimon-rust/pull/309#discussion_r3206082599


##########
crates/integrations/datafusion/src/sql_context.rs:
##########
@@ -275,6 +405,130 @@ impl SQLContext {
         }
     }
 
+    /// Handle SQL queries containing time-travel syntax (`VERSION AS OF` / 
`TIMESTAMP AS OF`).
+    ///
+    /// DataFusion's default SQL parser does not support these clauses, so we:
+    /// 1. Extract the table name and version/timestamp value via regex
+    /// 2. Strip the time-travel clause from the SQL
+    /// 3. Create a `PaimonTableProvider` with the appropriate scan options
+    /// 4. Register it, execute the stripped SQL, then deregister
+    async fn handle_time_travel_query(&self, sql: &str) -> DFResult<DataFrame> 
{
+        use crate::table::PaimonTableProvider;
+        use paimon::spec::{SCAN_TIMESTAMP_MILLIS_OPTION, SCAN_VERSION_OPTION};
+
+        let (table_name, options, clause_range) = if let Some(info) = 
extract_version_as_of(sql) {
+            let options = HashMap::from([(SCAN_VERSION_OPTION.to_string(), 
info.version)]);
+            (info.table_name, options, info.clause_range)
+        } else if let Some(info) = extract_timestamp_as_of(sql) {
+            let millis = Self::parse_timestamp_to_millis(&info.timestamp)?;
+            let options =
+                HashMap::from([(SCAN_TIMESTAMP_MILLIS_OPTION.to_string(), 
millis.to_string())]);
+            (info.table_name, options, info.clause_range)
+        } else {
+            return Err(DataFusionError::Plan(
+                "Failed to parse time-travel clause in SQL".to_string(),
+            ));
+        };
+
+        // Resolve the table from our catalog and create a provider with scan 
options
+        let table_ref: datafusion::common::TableReference = 
table_name.as_str().into();
+        let (catalog, _catalog_name, identifier) = 
self.resolve_table_name_from_ref(&table_ref)?;
+
+        let paimon_table = catalog
+            .get_table(&identifier)
+            .await
+            .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        let table_with_options = paimon_table.copy_with_options(options);
+        let provider = 
Arc::new(PaimonTableProvider::try_new(table_with_options)?);
+
+        // Use a UUID-based temp table name to avoid conflicts with existing 
tables.
+        let uuid_name = format!("__paimon_tt_{}", 
uuid::Uuid::new_v4().as_simple());
+
+        // Replace the original table name + time-travel clause with just the 
UUID name
+        let rewritten_sql = format!(
+            "{}{}{}",
+            &sql[..clause_range.0],
+            uuid_name,
+            &sql[clause_range.1..]
+        );
+
+        // Register the provider under the UUID temp table name
+        self.register_temp_table(uuid_name.as_str(), provider)?;

Review Comment:
   **P1 — Temp table leak on panic**
   
   If `self.ctx.sql()` panics, this temp table is never cleaned up. This PR 
already introduces `TempTableTracker` as an RAII guard for exactly this pattern 
(used in `execute_cow_delete_once`, `execute_cow_merge_once`, etc.), but it's 
not used here.
   
   Suggested fix:
   ```rust
   let mut tracker = TempTableTracker::new(self);
   self.register_temp_table(uuid_name.as_str(), provider)?;
   tracker.register(&uuid_name);
   
   let result = self.ctx.sql(&rewritten_sql).await;
   // tracker auto-deregisters on drop
   result
   ```



##########
crates/integrations/datafusion/src/catalog.rs:
##########
@@ -162,13 +190,102 @@ impl CatalogProvider for PaimonCatalogProvider {
                     Arc::clone(&catalog),
                     name,
                     dynamic_options,
+                    None,
                 )) as Arc<dyn SchemaProvider>))
             },
             "paimon catalog access thread panicked",
         )
     }
 }
 
+impl PaimonCatalogProvider {
+    /// Creates or returns an existing temporary in-memory database for temp 
tables/views.
+    fn get_or_create_temp_database(&self, name: &str) -> 
Arc<MemorySchemaProvider> {
+        let mut databases = self.temp_tables.write().unwrap_or_else(|e| 
e.into_inner());
+        databases
+            .entry(name.to_string())
+            .or_insert_with(|| Arc::new(MemorySchemaProvider::new()))
+            .clone()
+    }
+
+    /// Registers a temporary table or view in the specified database.
+    /// Creates the database if it does not exist.
+    ///
+    /// Returns an error if a temp table with the same name already exists in
+    /// the same database. Logs a warning if the name shadows a real Paimon 
table.
+    pub fn register_temp_table(
+        &self,
+        database: &str,
+        table_name: &str,
+        table: Arc<dyn TableProvider>,
+    ) -> DFResult<()> {
+        // Check if a temp table with this name already exists
+        {
+            let databases = self.temp_tables.read().unwrap_or_else(|e| 
e.into_inner());

Review Comment:
   **P1 — TOCTOU between existence check and registration**
   
   This read-lock check and the actual registration via 
`get_or_create_temp_database` (line 254) are done in separate lock 
acquisitions. Between them, there's a blocking `block_on_with_runtime` call 
(shadow warning, lines 235-252) where another thread could register a table 
with the same name.
   
   Since `register_temp_table` is a public API, consider holding the write lock 
for the entire check-then-register operation, or using 
`MemorySchemaProvider::register_table`'s return value to detect the conflict 
atomically.



##########
crates/integrations/datafusion/src/sql_context.rs:
##########
@@ -1498,6 +1942,115 @@ fn datum_to_constant_array(
     }
 }
 
+struct VersionAsOfInfo {
+    table_name: String,
+    version: String,
+    /// Byte range (start, end) covering "table_name VERSION AS OF n"
+    clause_range: (usize, usize),
+}
+
+struct TimestampAsOfInfo {
+    table_name: String,
+    timestamp: String,
+    /// Byte range (start, end) covering "table_name TIMESTAMP AS OF 'ts'"
+    clause_range: (usize, usize),
+}
+
+/// Extract `VERSION AS OF <n>` or `VERSION AS OF '<tag>'` from a SQL string.
+///
+/// Looks for the pattern (case-insensitive):
+/// - `... VERSION AS OF <number>` — numeric snapshot ID
+/// - `... VERSION AS OF '<tag>'` — tag name (quoted string)
+///
+/// Returns the table name, version/tag value, and byte range of the full 
clause.
+fn extract_version_as_of(sql: &str) -> Option<VersionAsOfInfo> {
+    let lower = sql.to_lowercase();

Review Comment:
   **P0 — Must fix (Java Paimon compatibility)**
   
   `lower.find("version as of ")` is a naive string search that will fail on 
valid SQL patterns that work in Java Paimon (Flink/Spark):
   
   1. **JOIN two time-travel tables**: `SELECT * FROM t1 VERSION AS OF 1 JOIN 
t2 VERSION AS OF 2 ON ...` — only the first match is found; the second is 
silently ignored.
   2. **Subquery / CTE**: `WITH cte AS (SELECT * FROM t VERSION AS OF 1) SELECT 
* FROM cte` — `clause_range` replacement corrupts the SQL structure.
   3. **String literal false positive**: `WHERE note = 'version as of 1'` — 
matches inside a string constant.
   4. **Table alias after clause**: `FROM t VERSION AS OF 1 AS tt` — 
replacement range doesn't account for the alias.
   
   In Java Paimon, all of these are handled correctly by the SQL parser. Users 
migrating SQL from Flink/Spark will hit unexpected failures.
   
   **Suggestion:** Consider a two-pass approach — first extract and strip 
time-travel clauses with awareness of quoted strings and parenthesis nesting, 
then hand the cleaned SQL to `Parser::parse_sql`. At minimum, add a 
quoted-string skip to avoid false positives inside string literals.



##########
docs/src/sql.md:
##########
@@ -560,6 +589,94 @@ SELECT * FROM paimon.my_db.assets;
 RESET 'paimon.blob-as-descriptor';
 ```
 
+## Temporary Tables
+
+You can register in-memory temporary tables under any catalog. Temporary 
tables exist only for the lifetime of the `SQLContext` instance and are 
automatically cleaned up when the context is dropped.
+
+The table name accepts flexible references, similar to DataFusion:
+- `"my_table"` — uses the current catalog and current database
+- `"database.my_table"` — uses the current catalog with the specified database
+- `"catalog.database.my_table"` — fully qualified
+
+### register_mem_table

Review Comment:
   **P1 — Non-existent API**
   
   `register_mem_table` is documented here but doesn't exist in the code — the 
PR implements `register_temp_table` instead. Either add `register_mem_table` as 
a convenience wrapper, or update the docs to reference `register_temp_table`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to