liurenjie1024 commented on code in PR #1764:
URL: https://github.com/apache/iceberg-rust/pull/1764#discussion_r2447586071
##########
crates/sqllogictest/src/engine/datafusion.rs:
##########
@@ -15,53 +15,76 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use anyhow::{Context, anyhow};
use datafusion::catalog::CatalogProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_sqllogictest::DataFusion;
+use iceberg::CatalogBuilder;
+use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+use iceberg_datafusion::IcebergCatalogProvider;
use indicatif::ProgressBar;
-use sqllogictest::runner::AsyncDB;
+use tempfile::TempDir;
use toml::Table as TomlTable;
-use crate::engine::EngineRunner;
+use crate::engine::{EngineRunner, run_slt_with_runner};
use crate::error::Result;
pub struct DataFusionEngine {
- datafusion: DataFusion,
+ test_data_path: PathBuf,
+ session_context: SessionContext,
}
#[async_trait::async_trait]
impl EngineRunner for DataFusionEngine {
async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
- let content = std::fs::read_to_string(path)
- .with_context(|| format!("Failed to read slt file {:?}", path))
- .map_err(|e| anyhow!(e))?;
+ let ctx = self.session_context.clone();
+ let testdata = self.test_data_path.clone();
- self.datafusion
- .run(content.as_str())
- .await
- .with_context(|| format!("Failed to run slt file {:?}", path))
- .map_err(|e| anyhow!(e))?;
+ let runner = sqllogictest::Runner::new({
+ move || {
+ let ctx = ctx.clone();
+ let testdata = testdata.clone();
+ async move {
+ // Everything here is owned; no `self` capture.
+ Ok(DataFusion::new(ctx, testdata, ProgressBar::new(100)))
+ }
+ }
+ });
- Ok(())
+ run_slt_with_runner(runner, path).await
}
}
impl DataFusionEngine {
pub async fn new(config: TomlTable) -> Result<Self> {
- let session_config = SessionConfig::new().with_target_partitions(4);
+ let session_config = SessionConfig::new()
+ .with_target_partitions(4)
+ .with_information_schema(true);
let ctx = SessionContext::new_with_config(session_config);
ctx.register_catalog("default", Self::create_catalog(&config).await?);
Ok(Self {
- datafusion: DataFusion::new(ctx, PathBuf::from("testdata"),
ProgressBar::new(100)),
+ test_data_path: PathBuf::from("testdata"),
+ session_context: ctx,
})
}
async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn
CatalogProvider>> {
- todo!()
+ let temp_dir = TempDir::new()?;
+ let path_str = temp_dir.path().to_str().unwrap().to_string();
Review Comment:
Could we just use `memory://`? IIRC OpenDal supports memory file.
##########
crates/sqllogictest/Cargo.toml:
##########
@@ -30,13 +30,26 @@ async-trait = { workspace = true }
datafusion = { workspace = true }
datafusion-sqllogictest = { workspace = true }
enum-ordinalize = { workspace = true }
+env_logger = "0.11.8"
Review Comment:
Put all depedencies in workspace `Cargo.toml`
##########
crates/sqllogictest/src/engine/datafusion.rs:
##########
@@ -15,53 +15,76 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
-use anyhow::{Context, anyhow};
use datafusion::catalog::CatalogProvider;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_sqllogictest::DataFusion;
+use iceberg::CatalogBuilder;
+use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+use iceberg_datafusion::IcebergCatalogProvider;
use indicatif::ProgressBar;
-use sqllogictest::runner::AsyncDB;
+use tempfile::TempDir;
use toml::Table as TomlTable;
-use crate::engine::EngineRunner;
+use crate::engine::{EngineRunner, run_slt_with_runner};
use crate::error::Result;
pub struct DataFusionEngine {
- datafusion: DataFusion,
+ test_data_path: PathBuf,
+ session_context: SessionContext,
}
#[async_trait::async_trait]
impl EngineRunner for DataFusionEngine {
async fn run_slt_file(&mut self, path: &Path) -> Result<()> {
- let content = std::fs::read_to_string(path)
- .with_context(|| format!("Failed to read slt file {:?}", path))
- .map_err(|e| anyhow!(e))?;
+ let ctx = self.session_context.clone();
+ let testdata = self.test_data_path.clone();
- self.datafusion
- .run(content.as_str())
- .await
- .with_context(|| format!("Failed to run slt file {:?}", path))
- .map_err(|e| anyhow!(e))?;
+ let runner = sqllogictest::Runner::new({
+ move || {
+ let ctx = ctx.clone();
+ let testdata = testdata.clone();
+ async move {
+ // Everything here is owned; no `self` capture.
+ Ok(DataFusion::new(ctx, testdata, ProgressBar::new(100)))
+ }
+ }
+ });
- Ok(())
+ run_slt_with_runner(runner, path).await
}
}
impl DataFusionEngine {
pub async fn new(config: TomlTable) -> Result<Self> {
- let session_config = SessionConfig::new().with_target_partitions(4);
+ let session_config = SessionConfig::new()
+ .with_target_partitions(4)
+ .with_information_schema(true);
let ctx = SessionContext::new_with_config(session_config);
ctx.register_catalog("default", Self::create_catalog(&config).await?);
Ok(Self {
- datafusion: DataFusion::new(ctx, PathBuf::from("testdata"),
ProgressBar::new(100)),
+ test_data_path: PathBuf::from("testdata"),
+ session_context: ctx,
})
}
async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn
CatalogProvider>> {
- todo!()
+ let temp_dir = TempDir::new()?;
Review Comment:
Please create an issue to track this and add comment here to refer to that
issue.
##########
crates/sqllogictest/src/engine/mod.rs:
##########
@@ -19,70 +19,94 @@ mod datafusion;
use std::path::Path;
+use anyhow::anyhow;
+use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
use toml::Table as TomlTable;
use crate::engine::datafusion::DataFusionEngine;
-use crate::error::Result;
+use crate::error::{Error, Result};
-const KEY_TYPE: &str = "type";
const TYPE_DATAFUSION: &str = "datafusion";
+const ERRS_PER_FILE_LIMIT: usize = 10;
#[async_trait::async_trait]
-pub trait EngineRunner: Sized {
+pub trait EngineRunner: Send {
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
}
-pub enum Engine {
- DataFusion(DataFusionEngine),
+pub async fn load_engine_runner(
+ engine_type: &str,
+ cfg: TomlTable,
+) -> Result<Box<dyn EngineRunner>> {
+ match engine_type {
+ TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
+ _ => Err(anyhow::anyhow!("Unsupported engine type: {}",
engine_type).into()),
+ }
}
-impl Engine {
- pub async fn new(config: TomlTable) -> Result<Self> {
- let engine_type = config
- .get(KEY_TYPE)
- .ok_or_else(|| anyhow::anyhow!("Missing required key:
{KEY_TYPE}"))?
- .as_str()
- .ok_or_else(|| anyhow::anyhow!("Config value for {KEY_TYPE} must
be a string"))?;
-
- match engine_type {
- TYPE_DATAFUSION => {
- let engine = DataFusionEngine::new(config).await?;
- Ok(Engine::DataFusion(engine))
- }
- _ => Err(anyhow::anyhow!("Unsupported engine type:
{engine_type}").into()),
+pub async fn run_slt_with_runner<D, M>(
+ mut runner: Runner<D, M>,
+ step_slt_file: impl AsRef<Path>,
+) -> Result<()>
+where
+ D: AsyncDB + Send + 'static,
+ M: MakeConnection<Conn = D> + Send + 'static,
+{
+ let path = step_slt_file.as_ref().canonicalize()?;
+
+ let records = parse_file(&path).map_err(|e| Error(anyhow!("parsing SLT
file failed: {e}")))?;
+
+ let mut errs = vec![];
+ for record in records {
+ if let Err(err) = runner.run_async(record).await {
Review Comment:
I think we should stop immeidately when we have an error.
##########
crates/sqllogictest/src/engine/mod.rs:
##########
@@ -19,70 +19,94 @@ mod datafusion;
use std::path::Path;
+use anyhow::anyhow;
+use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
use toml::Table as TomlTable;
use crate::engine::datafusion::DataFusionEngine;
-use crate::error::Result;
+use crate::error::{Error, Result};
-const KEY_TYPE: &str = "type";
const TYPE_DATAFUSION: &str = "datafusion";
+const ERRS_PER_FILE_LIMIT: usize = 10;
#[async_trait::async_trait]
-pub trait EngineRunner: Sized {
+pub trait EngineRunner: Send {
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
}
-pub enum Engine {
- DataFusion(DataFusionEngine),
+pub async fn load_engine_runner(
+ engine_type: &str,
+ cfg: TomlTable,
+) -> Result<Box<dyn EngineRunner>> {
+ match engine_type {
+ TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
+ _ => Err(anyhow::anyhow!("Unsupported engine type: {}",
engine_type).into()),
+ }
}
-impl Engine {
- pub async fn new(config: TomlTable) -> Result<Self> {
- let engine_type = config
- .get(KEY_TYPE)
- .ok_or_else(|| anyhow::anyhow!("Missing required key:
{KEY_TYPE}"))?
- .as_str()
- .ok_or_else(|| anyhow::anyhow!("Config value for {KEY_TYPE} must
be a string"))?;
-
- match engine_type {
- TYPE_DATAFUSION => {
- let engine = DataFusionEngine::new(config).await?;
- Ok(Engine::DataFusion(engine))
- }
- _ => Err(anyhow::anyhow!("Unsupported engine type:
{engine_type}").into()),
+pub async fn run_slt_with_runner<D, M>(
+ mut runner: Runner<D, M>,
+ step_slt_file: impl AsRef<Path>,
+) -> Result<()>
+where
+ D: AsyncDB + Send + 'static,
+ M: MakeConnection<Conn = D> + Send + 'static,
+{
+ let path = step_slt_file.as_ref().canonicalize()?;
+
+ let records = parse_file(&path).map_err(|e| Error(anyhow!("parsing SLT
file failed: {e}")))?;
+
+ let mut errs = vec![];
+ for record in records {
+ if let Err(err) = runner.run_async(record).await {
Review Comment:
Also when reporting error, it would be better to print line number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]