This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dc57643a feat(sqllogictest): use serde derived structs for schedule 
parsing (#1953)
5dc57643a is described below

commit 5dc57643aebbe1be3ac36e29581fa7462bfdd2cd
Author: Andrea Bozzo <[email protected]>
AuthorDate: Thu Dec 25 01:45:35 2025 +0100

    feat(sqllogictest): use serde derived structs for schedule parsing (#1953)
    
    This PR refactors the schedule file parsing in the sqllogictest crate to
    use serde-derived structs instead of manual TOML parsing, as requested
    in #1952.
    
    ### Changes
    
    - **New structs with serde derives:**
      - `ScheduleConfig` - top-level configuration parsed from TOML
    - `EngineConfig` - per-engine configuration with `#[serde(flatten)]` for
    extensibility
      - `EngineType` - enum of supported engine types
    
    - **Refactored parsing flow:**
      - `Schedule::from_file()` now uses `toml::from_str()` directly
    - Added `instantiate_engines()` to separate parsing from engine creation
      - Removed manual `parse_engines()` and `parse_steps()` functions
    
    - **Forward-compatibility:**
    - Uses `#[serde(flatten)]` to capture extra fields in
    `EngineConfig.extra`
    - This enables PR #1943 to easily add `catalog_type` and
    `catalog_properties` support
    
    ### Relation to #1943
    
    This PR was suggested by @liurenjie1024 as a prerequisite to #1943
    (dynamic catalog configuration). The `#[serde(flatten)]` approach allows
    #1943 to simply extract the catalog configuration from
    `EngineConfig.extra` without modifying the parsing logic.
    
    ### Testing
    
    - All existing tests pass
    - Added new unit tests for deserialization behavior
    - Integration test with `df_test.toml` passes unchanged
    
    Closes #1952
---
 crates/sqllogictest/src/engine/datafusion.rs |  18 ++-
 crates/sqllogictest/src/engine/mod.rs        |  96 +++++++++----
 crates/sqllogictest/src/schedule.rs          | 198 +++++++++++++++++----------
 3 files changed, 207 insertions(+), 105 deletions(-)

diff --git a/crates/sqllogictest/src/engine/datafusion.rs 
b/crates/sqllogictest/src/engine/datafusion.rs
index e3402dfa9..e9f93287d 100644
--- a/crates/sqllogictest/src/engine/datafusion.rs
+++ b/crates/sqllogictest/src/engine/datafusion.rs
@@ -27,9 +27,8 @@ use iceberg::spec::{NestedField, PrimitiveType, Schema, 
Transform, Type, Unbound
 use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
 use iceberg_datafusion::IcebergCatalogProvider;
 use indicatif::ProgressBar;
-use toml::Table as TomlTable;
 
-use crate::engine::{EngineRunner, run_slt_with_runner};
+use crate::engine::{DatafusionCatalogConfig, EngineRunner, 
run_slt_with_runner};
 use crate::error::Result;
 
 pub struct DataFusionEngine {
@@ -59,12 +58,15 @@ impl EngineRunner for DataFusionEngine {
 }
 
 impl DataFusionEngine {
-    pub async fn new(config: TomlTable) -> Result<Self> {
+    pub async fn new(catalog_config: Option<DatafusionCatalogConfig>) -> 
Result<Self> {
         let session_config = SessionConfig::new()
             .with_target_partitions(4)
             .with_information_schema(true);
         let ctx = SessionContext::new_with_config(session_config);
-        ctx.register_catalog("default", Self::create_catalog(&config).await?);
+        ctx.register_catalog(
+            "default",
+            Self::create_catalog(catalog_config.as_ref()).await?,
+        );
 
         Ok(Self {
             test_data_path: PathBuf::from("testdata"),
@@ -72,9 +74,11 @@ impl DataFusionEngine {
         })
     }
 
-    async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn 
CatalogProvider>> {
-        // TODO: support dynamic catalog configuration
-        //  See: https://github.com/apache/iceberg-rust/issues/1780
+    async fn create_catalog(
+        _catalog_config: Option<&DatafusionCatalogConfig>,
+    ) -> anyhow::Result<Arc<dyn CatalogProvider>> {
+        // TODO: Use catalog_config to load different catalog types via 
iceberg-catalog-loader
+        // See: https://github.com/apache/iceberg-rust/issues/1780
         let catalog = MemoryCatalogBuilder::default()
             .load(
                 "memory",
diff --git a/crates/sqllogictest/src/engine/mod.rs 
b/crates/sqllogictest/src/engine/mod.rs
index 724359fbe..a27667140 100644
--- a/crates/sqllogictest/src/engine/mod.rs
+++ b/crates/sqllogictest/src/engine/mod.rs
@@ -17,29 +17,45 @@
 
 mod datafusion;
 
+use std::collections::HashMap;
 use std::path::Path;
 
 use anyhow::anyhow;
+use serde::Deserialize;
 use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
-use toml::Table as TomlTable;
 
 use crate::engine::datafusion::DataFusionEngine;
 use crate::error::{Error, Result};
 
-const TYPE_DATAFUSION: &str = "datafusion";
+/// Configuration for the catalog used by the DataFusion engine
+#[derive(Debug, Clone, Deserialize)]
+pub struct DatafusionCatalogConfig {
+    /// Catalog type: "memory", "rest", "glue", "hms", "s3tables", "sql"
+    #[serde(rename = "type")]
+    pub catalog_type: String,
+    /// Catalog properties passed to the catalog loader
+    #[serde(default)]
+    pub props: HashMap<String, String>,
+}
+
+/// Engine configuration as a tagged enum
+#[derive(Debug, Clone, Deserialize)]
+#[serde(tag = "type", rename_all = "lowercase")]
+pub enum EngineConfig {
+    Datafusion {
+        #[serde(default)]
+        catalog: Option<DatafusionCatalogConfig>,
+    },
+}
 
 #[async_trait::async_trait]
 pub trait EngineRunner: Send {
     async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
 }
 
-pub async fn load_engine_runner(
-    engine_type: &str,
-    cfg: TomlTable,
-) -> Result<Box<dyn EngineRunner>> {
-    match engine_type {
-        TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
-        _ => Err(anyhow::anyhow!("Unsupported engine type: 
{engine_type}").into()),
+pub async fn load_engine_runner(config: EngineConfig) -> Result<Box<dyn 
EngineRunner>> {
+    match config {
+        EngineConfig::Datafusion { catalog } => 
Ok(Box::new(DataFusionEngine::new(catalog).await?)),
     }
 }
 
@@ -65,29 +81,63 @@ where
 
 #[cfg(test)]
 mod tests {
-    use crate::engine::{TYPE_DATAFUSION, load_engine_runner};
+    use crate::engine::{DatafusionCatalogConfig, EngineConfig, 
load_engine_runner};
 
-    #[tokio::test]
-    async fn test_engine_invalid_type() {
+    #[test]
+    fn test_deserialize_engine_config() {
+        let input = r#"type = "datafusion""#;
+
+        let config: EngineConfig = toml::from_str(input).unwrap();
+        assert!(matches!(config, EngineConfig::Datafusion { catalog: None }));
+    }
+
+    #[test]
+    fn test_deserialize_engine_config_with_catalog() {
+        let input = r#"
+            type = "datafusion"
+
+            [catalog]
+            type = "rest"
+
+            [catalog.props]
+            uri = "http://localhost:8181";
+        "#;
+
+        let config: EngineConfig = toml::from_str(input).unwrap();
+        match config {
+            EngineConfig::Datafusion { catalog: Some(cat) } => {
+                assert_eq!(cat.catalog_type, "rest");
+                assert_eq!(
+                    cat.props.get("uri"),
+                    Some(&"http://localhost:8181".to_string())
+                );
+            }
+            _ => panic!("Expected Datafusion with catalog"),
+        }
+    }
+
+    #[test]
+    fn test_deserialize_catalog_config() {
         let input = r#"
-            [engines]
-            random = { type = "random_engine", url = "http://localhost:8181"; }
+            type = "memory"
+
+            [props]
+            warehouse = "file:///tmp/warehouse"
         "#;
-        let tbl = toml::from_str(input).unwrap();
-        let result = load_engine_runner("random_engine", tbl).await;
 
-        assert!(result.is_err());
+        let config: DatafusionCatalogConfig = toml::from_str(input).unwrap();
+        assert_eq!(config.catalog_type, "memory");
+        assert_eq!(
+            config.props.get("warehouse"),
+            Some(&"file:///tmp/warehouse".to_string())
+        );
     }
 
     #[tokio::test]
     async fn test_load_datafusion() {
-        let input = r#"
-            [engines]
-            df = { type = "datafusion" }
-        "#;
-        let tbl = toml::from_str(input).unwrap();
-        let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;
+        let config = EngineConfig::Datafusion { catalog: None };
 
+        let result = load_engine_runner(config).await;
         assert!(result.is_ok());
     }
 }
diff --git a/crates/sqllogictest/src/schedule.rs 
b/crates/sqllogictest/src/schedule.rs
index 7c13ad4d1..25728a296 100644
--- a/crates/sqllogictest/src/schedule.rs
+++ b/crates/sqllogictest/src/schedule.rs
@@ -21,10 +21,18 @@ use std::path::{Path, PathBuf};
 
 use anyhow::{Context, anyhow};
 use serde::{Deserialize, Serialize};
-use toml::{Table as TomlTable, Value};
 use tracing::info;
 
-use crate::engine::{EngineRunner, load_engine_runner};
+use crate::engine::{EngineConfig, EngineRunner, load_engine_runner};
+
+/// Raw configuration parsed from the schedule TOML file
+#[derive(Debug, Clone, Deserialize)]
+pub struct ScheduleConfig {
+    /// Engine name to engine configuration
+    pub engines: HashMap<String, EngineConfig>,
+    /// List of test steps to run
+    pub steps: Vec<Step>,
+}
 
 pub struct Schedule {
     /// Engine names to engine instances
@@ -59,15 +67,27 @@ impl Schedule {
     pub async fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
         let path_str = path.as_ref().to_string_lossy().to_string();
         let content = read_to_string(path)?;
-        let toml_value = content.parse::<Value>()?;
-        let toml_table = toml_value
-            .as_table()
-            .ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?;
 
-        let engines = Schedule::parse_engines(toml_table).await?;
-        let steps = Schedule::parse_steps(toml_table)?;
+        let config: ScheduleConfig = toml::from_str(&content)
+            .with_context(|| format!("Failed to parse schedule file: 
{path_str}"))?;
 
-        Ok(Self::new(engines, steps, path_str))
+        let engines = Self::instantiate_engines(config.engines).await?;
+
+        Ok(Self::new(engines, config.steps, path_str))
+    }
+
+    /// Instantiate engine runners from their configurations
+    async fn instantiate_engines(
+        configs: HashMap<String, EngineConfig>,
+    ) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
+        let mut engines = HashMap::new();
+
+        for (name, config) in configs {
+            let engine = load_engine_runner(config).await?;
+            engines.insert(name, engine);
+        }
+
+        Ok(engines)
     }
 
     pub async fn run(mut self) -> anyhow::Result<()> {
@@ -105,103 +125,131 @@ impl Schedule {
         }
         Ok(())
     }
+}
 
-    async fn parse_engines(
-        table: &TomlTable,
-    ) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
-        let engines_tbl = table
-            .get("engines")
-            .with_context(|| "Schedule file must have an 'engines' table")?
-            .as_table()
-            .ok_or_else(|| anyhow!("'engines' must be a table"))?;
-
-        let mut engines = HashMap::new();
-
-        for (name, engine_val) in engines_tbl {
-            let cfg_tbl = engine_val
-                .as_table()
-                .ok_or_else(|| anyhow!("Config of engine '{name}' is not a 
table"))?
-                .clone();
-
-            let engine_type = cfg_tbl
-                .get("type")
-                .ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a 
'type' field"))?
-                .as_str()
-                .ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a 
string"))?;
-
-            let engine = load_engine_runner(engine_type, 
cfg_tbl.clone()).await?;
-
-            if engines.insert(name.clone(), engine).is_some() {
-                return Err(anyhow!("Duplicate engine '{name}'"));
-            }
-        }
+#[cfg(test)]
+mod tests {
+    use crate::engine::EngineConfig;
+    use crate::schedule::ScheduleConfig;
 
-        Ok(engines)
-    }
+    #[test]
+    fn test_deserialize_schedule_config() {
+        let input = r#"
+            [engines]
+            df = { type = "datafusion" }
 
-    fn parse_steps(table: &TomlTable) -> anyhow::Result<Vec<Step>> {
-        let steps_val = table
-            .get("steps")
-            .with_context(|| "Schedule file must have a 'steps' array")?;
+            [[steps]]
+            engine = "df"
+            slt = "test.slt"
+        "#;
 
-        let steps: Vec<Step> = steps_val
-            .clone()
-            .try_into()
-            .with_context(|| "Failed to deserialize steps")?;
+        let config: ScheduleConfig = toml::from_str(input).unwrap();
 
-        Ok(steps)
+        assert_eq!(config.engines.len(), 1);
+        assert!(config.engines.contains_key("df"));
+        assert!(matches!(config.engines["df"], EngineConfig::Datafusion {
+            catalog: None
+        }));
+        assert_eq!(config.steps.len(), 1);
+        assert_eq!(config.steps[0].engine, "df");
+        assert_eq!(config.steps[0].slt, "test.slt");
     }
-}
-
-#[cfg(test)]
-mod tests {
-    use toml::Table as TomlTable;
-
-    use crate::schedule::Schedule;
 
     #[test]
-    fn test_parse_steps() {
+    fn test_deserialize_multiple_steps() {
         let input = r#"
+            [engines]
+            datafusion = { type = "datafusion" }
+
             [[steps]]
             engine = "datafusion"
             slt = "test.slt"
 
             [[steps]]
-            engine = "spark"
+            engine = "datafusion"
             slt = "test2.slt"
         "#;
 
-        let tbl: TomlTable = toml::from_str(input).unwrap();
-        let steps = Schedule::parse_steps(&tbl).unwrap();
+        let config: ScheduleConfig = toml::from_str(input).unwrap();
 
-        assert_eq!(steps.len(), 2);
-        assert_eq!(steps[0].engine, "datafusion");
-        assert_eq!(steps[0].slt, "test.slt");
-        assert_eq!(steps[1].engine, "spark");
-        assert_eq!(steps[1].slt, "test2.slt");
+        assert_eq!(config.steps.len(), 2);
+        assert_eq!(config.steps[0].engine, "datafusion");
+        assert_eq!(config.steps[0].slt, "test.slt");
+        assert_eq!(config.steps[1].engine, "datafusion");
+        assert_eq!(config.steps[1].slt, "test2.slt");
     }
 
     #[test]
-    fn test_parse_steps_empty() {
+    fn test_deserialize_with_catalog_config() {
         let input = r#"
+            [engines.df]
+            type = "datafusion"
+
+            [engines.df.catalog]
+            type = "rest"
+
+            [engines.df.catalog.props]
+            uri = "http://localhost:8181";
+
             [[steps]]
+            engine = "df"
+            slt = "test.slt"
         "#;
 
-        let tbl: TomlTable = toml::from_str(input).unwrap();
-        let steps = Schedule::parse_steps(&tbl);
+        let config: ScheduleConfig = toml::from_str(input).unwrap();
 
-        assert!(steps.is_err());
+        match &config.engines["df"] {
+            EngineConfig::Datafusion { catalog: Some(cat) } => {
+                assert_eq!(cat.catalog_type, "rest");
+                assert_eq!(
+                    cat.props.get("uri"),
+                    Some(&"http://localhost:8181".to_string())
+                );
+            }
+            _ => panic!("Expected Datafusion with catalog config"),
+        }
     }
 
-    #[tokio::test]
-    async fn test_parse_engines_invalid_table() {
-        let toml_content = r#"
-            engines = "not_a_table"
+    #[test]
+    fn test_deserialize_missing_engine_type() {
+        let input = r#"
+            [engines]
+            df = { }
+
+            [[steps]]
+            engine = "df"
+            slt = "test.slt"
         "#;
 
-        let table: TomlTable = toml::from_str(toml_content).unwrap();
-        let result = Schedule::parse_engines(&table).await;
+        let result: Result<ScheduleConfig, _> = toml::from_str(input);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_deserialize_invalid_engine_type() {
+        let input = r#"
+            [engines]
+            df = { type = "unknown_engine" }
+
+            [[steps]]
+            engine = "df"
+            slt = "test.slt"
+        "#;
+
+        let result: Result<ScheduleConfig, _> = toml::from_str(input);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn test_deserialize_missing_step_fields() {
+        let input = r#"
+            [engines]
+            df = { type = "datafusion" }
+
+            [[steps]]
+        "#;
 
+        let result: Result<ScheduleConfig, _> = toml::from_str(input);
         assert!(result.is_err());
     }
 }

Reply via email to