This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 14beb79a21 Move `DataSink` to `datasource` and add session crate
(#15371)
14beb79a21 is described below
commit 14beb79a21f827c5990f4429db61c486ec260d63
Author: Jay Zhan <[email protected]>
AuthorDate: Wed Mar 26 15:26:17 2025 +0800
Move `DataSink` to `datasource` and add session crate (#15371)
* session
* clippy
* fmt
* session
* minor
* Update README.md
* doc
* fmt
* doc
---------
Co-authored-by: berkaysynnada <[email protected]>
---
Cargo.lock | 32 ++++++++++++++-
Cargo.toml | 2 +
datafusion/catalog-listing/Cargo.toml | 1 +
datafusion/catalog/Cargo.toml | 2 +
datafusion/catalog/src/lib.rs | 30 ++++++++------
datafusion/catalog/src/stream.rs | 2 +-
datafusion/catalog/src/table.rs | 2 +-
datafusion/core/Cargo.toml | 1 +
datafusion/core/src/datasource/dynamic_file.rs | 10 +++--
.../core/src/datasource/file_format/arrow.rs | 2 +-
datafusion/core/src/datasource/memory.rs | 2 +-
datafusion/core/src/datasource/mod.rs | 1 +
datafusion/core/src/execution/context/mod.rs | 35 ++++++++--------
datafusion/core/src/execution/session_state.rs | 5 ++-
datafusion/core/src/lib.rs | 5 +++
datafusion/datasource-avro/Cargo.toml | 1 +
datafusion/datasource-avro/src/file_format.rs | 16 ++++----
datafusion/datasource-csv/Cargo.toml | 1 +
datafusion/datasource-csv/src/file_format.rs | 10 ++---
datafusion/datasource-json/Cargo.toml | 1 +
datafusion/datasource-json/src/file_format.rs | 10 ++---
datafusion/datasource-parquet/Cargo.toml | 1 +
datafusion/datasource-parquet/src/file_format.rs | 30 +++++++-------
datafusion/datasource/Cargo.toml | 2 +-
datafusion/datasource/src/file_format.rs | 6 +--
datafusion/datasource/src/file_sink_config.rs | 9 +++--
datafusion/datasource/src/mod.rs | 1 +
.../src/insert.rs => datasource/src/sink.rs} | 10 ++---
datafusion/datasource/src/url.rs | 6 ++-
datafusion/physical-plan/src/lib.rs | 1 -
datafusion/proto/src/physical_plan/mod.rs | 47 +++++++++++-----------
.../proto/tests/cases/roundtrip_physical_plan.rs | 11 ++---
datafusion/{catalog => session}/Cargo.toml | 4 +-
datafusion/session/LICENSE.txt | 1 +
datafusion/session/NOTICE.txt | 1 +
datafusion/session/README.md | 26 ++++++++++++
datafusion/session/src/lib.rs | 39 ++++++++++++++++++
datafusion/{catalog => session}/src/session.rs | 6 +--
38 files changed, 249 insertions(+), 123 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 7a2d1f8106..3e7064c9a0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1830,6 +1830,7 @@ dependencies = [
"datafusion-physical-expr-common",
"datafusion-physical-optimizer",
"datafusion-physical-plan",
+ "datafusion-session",
"datafusion-sql",
"doc-comment",
"env_logger",
@@ -1893,10 +1894,12 @@ dependencies = [
"dashmap",
"datafusion-common",
"datafusion-common-runtime",
+ "datafusion-datasource",
"datafusion-execution",
"datafusion-expr",
"datafusion-physical-expr",
"datafusion-physical-plan",
+ "datafusion-session",
"datafusion-sql",
"futures",
"itertools 0.14.0",
@@ -1920,6 +1923,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"datafusion-physical-plan",
+ "datafusion-session",
"futures",
"log",
"object_store",
@@ -2002,7 +2006,6 @@ dependencies = [
"bytes",
"bzip2 0.5.2",
"chrono",
- "datafusion-catalog",
"datafusion-common",
"datafusion-common-runtime",
"datafusion-execution",
@@ -2010,6 +2013,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"datafusion-physical-plan",
+ "datafusion-session",
"flate2",
"futures",
"glob",
@@ -2042,6 +2046,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"datafusion-physical-plan",
+ "datafusion-session",
"futures",
"num-traits",
"object_store",
@@ -2066,6 +2071,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"datafusion-physical-plan",
+ "datafusion-session",
"futures",
"object_store",
"regex",
@@ -2088,6 +2094,7 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"datafusion-physical-plan",
+ "datafusion-session",
"futures",
"object_store",
"serde_json",
@@ -2113,6 +2120,7 @@ dependencies = [
"datafusion-physical-expr-common",
"datafusion-physical-optimizer",
"datafusion-physical-plan",
+ "datafusion-session",
"futures",
"itertools 0.14.0",
"log",
@@ -2505,6 +2513,28 @@ dependencies = [
"serde_json",
]
+[[package]]
+name = "datafusion-session"
+version = "46.0.1"
+dependencies = [
+ "arrow",
+ "async-trait",
+ "dashmap",
+ "datafusion-common",
+ "datafusion-common-runtime",
+ "datafusion-execution",
+ "datafusion-expr",
+ "datafusion-physical-expr",
+ "datafusion-physical-plan",
+ "datafusion-sql",
+ "futures",
+ "itertools 0.14.0",
+ "log",
+ "object_store",
+ "parking_lot",
+ "tokio",
+]
+
[[package]]
name = "datafusion-sql"
version = "46.0.1"
diff --git a/Cargo.toml b/Cargo.toml
index 1a7a915ee8..b6164f89d3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,6 +47,7 @@ members = [
"datafusion/proto/gen",
"datafusion/proto-common",
"datafusion/proto-common/gen",
+ "datafusion/session",
"datafusion/sql",
"datafusion/sqllogictest",
"datafusion/substrait",
@@ -136,6 +137,7 @@ datafusion-physical-optimizer = { path =
"datafusion/physical-optimizer", versio
datafusion-physical-plan = { path = "datafusion/physical-plan", version =
"46.0.1" }
datafusion-proto = { path = "datafusion/proto", version = "46.0.1" }
datafusion-proto-common = { path = "datafusion/proto-common", version =
"46.0.1" }
+datafusion-session = { path = "datafusion/session", version = "46.0.1" }
datafusion-sql = { path = "datafusion/sql", version = "46.0.1" }
doc-comment = "0.3"
env_logger = "0.11"
diff --git a/datafusion/catalog-listing/Cargo.toml
b/datafusion/catalog-listing/Cargo.toml
index c6f9ea8bd0..7345802022 100644
--- a/datafusion/catalog-listing/Cargo.toml
+++ b/datafusion/catalog-listing/Cargo.toml
@@ -41,6 +41,7 @@ datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
diff --git a/datafusion/catalog/Cargo.toml b/datafusion/catalog/Cargo.toml
index 113d688253..7307c4de87 100644
--- a/datafusion/catalog/Cargo.toml
+++ b/datafusion/catalog/Cargo.toml
@@ -36,10 +36,12 @@ async-trait = { workspace = true }
dashmap = { workspace = true }
datafusion-common = { workspace = true }
datafusion-common-runtime = { workspace = true }
+datafusion-datasource = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
datafusion-sql = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
diff --git a/datafusion/catalog/src/lib.rs b/datafusion/catalog/src/lib.rs
index a1c0a6185d..f160bddd2b 100644
--- a/datafusion/catalog/src/lib.rs
+++ b/datafusion/catalog/src/lib.rs
@@ -31,26 +31,32 @@
//! * Simple memory based catalog: [`MemoryCatalogProviderList`],
[`MemoryCatalogProvider`], [`MemorySchemaProvider`]
//! * Listing schema: [`listing_schema`]
+pub mod cte_worktable;
+pub mod default_table_source;
+pub mod information_schema;
+pub mod listing_schema;
pub mod memory;
-pub use memory::{
- MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
-};
+pub mod stream;
+pub mod streaming;
+pub mod view;
+
mod r#async;
mod catalog;
mod dynamic_file;
-pub mod information_schema;
-pub mod listing_schema;
mod schema;
-mod session;
mod table;
+
pub use catalog::*;
+pub use datafusion_session::Session;
pub use dynamic_file::catalog::*;
+pub use memory::{
+ MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
+};
pub use r#async::*;
pub use schema::*;
-pub use session::*;
pub use table::*;
-pub mod cte_worktable;
-pub mod default_table_source;
-pub mod stream;
-pub mod streaming;
-pub mod view;
+
+// For backwards compatibility,
+mod session {
+ pub use datafusion_session::Session;
+}
diff --git a/datafusion/catalog/src/stream.rs b/datafusion/catalog/src/stream.rs
index 3fb6724907..fbfab51322 100644
--- a/datafusion/catalog/src/stream.rs
+++ b/datafusion/catalog/src/stream.rs
@@ -30,11 +30,11 @@ use arrow::array::{RecordBatch, RecordBatchReader,
RecordBatchWriter};
use arrow::datatypes::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError,
Result};
use datafusion_common_runtime::SpawnedTask;
+use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_expr::create_ordering;
-use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index ecc792f73d..207abb9c66 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -288,7 +288,7 @@ pub trait TableProvider: Debug + Sync + Send {
/// See [`DataSinkExec`] for the common pattern of inserting a
/// streams of `RecordBatch`es as files to an ObjectStore.
///
- /// [`DataSinkExec`]: datafusion_physical_plan::insert::DataSinkExec
+ /// [`DataSinkExec`]: datafusion_datasource::sink::DataSinkExec
async fn insert_into(
&self,
_state: &dyn Session,
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 3073abf8bb..56698e4d7e 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -123,6 +123,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
datafusion-sql = { workspace = true }
flate2 = { version = "1.1.0", optional = true }
futures = { workspace = true }
diff --git a/datafusion/core/src/datasource/dynamic_file.rs
b/datafusion/core/src/datasource/dynamic_file.rs
index 6654d0871c..b30d53e586 100644
--- a/datafusion/core/src/datasource/dynamic_file.rs
+++ b/datafusion/core/src/datasource/dynamic_file.rs
@@ -20,15 +20,17 @@
use std::sync::Arc;
-use async_trait::async_trait;
-use datafusion_catalog::{SessionStore, UrlTableFactory};
-use datafusion_common::plan_datafusion_err;
-
use crate::datasource::listing::{ListingTable, ListingTableConfig,
ListingTableUrl};
use crate::datasource::TableProvider;
use crate::error::Result;
use crate::execution::context::SessionState;
+use datafusion_catalog::UrlTableFactory;
+use datafusion_common::plan_datafusion_err;
+use datafusion_session::SessionStore;
+
+use async_trait::async_trait;
+
/// [DynamicListTableFactory] is a factory that can create a [ListingTable]
from the given url.
#[derive(Default, Debug)]
pub struct DynamicListTableFactory {
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs
b/datafusion/core/src/datasource/file_format/arrow.rs
index d0b9df403b..7c41855110 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -50,11 +50,11 @@ use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
-use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use async_trait::async_trait;
use bytes::Bytes;
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index 23d09719f2..27352c5146 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -25,7 +25,6 @@ use std::sync::Arc;
use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::logical_expr::Expr;
-use crate::physical_plan::insert::{DataSink, DataSinkExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::{
common, DisplayAs, DisplayFormatType, ExecutionPlan,
ExecutionPlanProperties,
@@ -39,6 +38,7 @@ use datafusion_catalog::Session;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema,
SchemaExt};
use datafusion_common_runtime::JoinSet;
pub use datafusion_datasource::memory::MemorySourceConfig;
+use datafusion_datasource::sink::{DataSink, DataSinkExec};
pub use datafusion_datasource::source::DataSourceExec;
use datafusion_execution::TaskContext;
use datafusion_expr::dml::InsertOp;
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index a932ae76c6..b37198ff93 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -43,6 +43,7 @@ pub use datafusion_catalog::default_table_source;
pub use datafusion_catalog::stream;
pub use datafusion_catalog::view;
pub use datafusion_datasource::schema_adapter;
+pub use datafusion_datasource::sink;
pub use datafusion_datasource::source;
pub use datafusion_execution::object_store;
pub use datafusion_physical_expr::create_ordering;
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 714e94234a..fc110a0699 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -17,13 +17,13 @@
//! [`SessionContext`] API for registering data sources and executing queries
-use datafusion_catalog::memory::MemorySchemaProvider;
-use datafusion_catalog::MemoryCatalogProvider;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Weak};
use super::options::ReadOptions;
+use crate::datasource::dynamic_file::DynamicListTableFactory;
+use crate::execution::session_state::SessionStateBuilder;
use crate::{
catalog::listing_schema::ListingSchemaProvider,
catalog::{
@@ -49,39 +49,40 @@ use crate::{
variable::{VarProvider, VarType},
};
+// backwards compatibility
+pub use crate::execution::session_state::SessionState;
+
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
+use datafusion_catalog::memory::MemorySchemaProvider;
+use datafusion_catalog::MemoryCatalogProvider;
+use datafusion_catalog::{
+ DynamicFileCatalog, TableFunction, TableFunctionImpl, UrlTableFactory,
+};
+use datafusion_common::config::ConfigOptions;
use datafusion_common::{
config::{ConfigExtension, TableOptions},
exec_datafusion_err, exec_err, not_impl_err, plan_datafusion_err, plan_err,
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
DFSchema, ParamValues, ScalarValue, SchemaReference, TableReference,
};
+pub use datafusion_execution::config::SessionConfig;
use datafusion_execution::registry::SerializerRegistry;
+pub use datafusion_execution::TaskContext;
+pub use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{
expr_rewriter::FunctionRewrite,
logical_plan::{DdlStatement, Statement},
planner::ExprPlanner,
Expr, UserDefinedLogicalNode, WindowUDF,
};
-
-// backwards compatibility
-pub use crate::execution::session_state::SessionState;
-
-use crate::datasource::dynamic_file::DynamicListTableFactory;
-use crate::execution::session_state::SessionStateBuilder;
-use async_trait::async_trait;
-use chrono::{DateTime, Utc};
-use datafusion_catalog::{
- DynamicFileCatalog, SessionStore, TableFunction, TableFunctionImpl,
UrlTableFactory,
-};
-use datafusion_common::config::ConfigOptions;
-pub use datafusion_execution::config::SessionConfig;
-pub use datafusion_execution::TaskContext;
-pub use datafusion_expr::execution_props::ExecutionProps;
use datafusion_optimizer::analyzer::type_coercion::TypeCoercion;
use datafusion_optimizer::Analyzer;
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
+use datafusion_session::SessionStore;
+
+use async_trait::async_trait;
+use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;
diff --git a/datafusion/core/src/execution/session_state.rs
b/datafusion/core/src/execution/session_state.rs
index 515163102c..28f599304f 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -33,10 +33,10 @@ use crate::physical_planner::{DefaultPhysicalPlanner,
PhysicalPlanner};
use datafusion_catalog::information_schema::{
InformationSchemaProvider, INFORMATION_SCHEMA,
};
-use datafusion_catalog::MemoryCatalogProviderList;
use arrow::datatypes::{DataType, SchemaRef};
-use datafusion_catalog::{Session, TableFunction, TableFunctionImpl};
+use datafusion_catalog::MemoryCatalogProviderList;
+use datafusion_catalog::{TableFunction, TableFunctionImpl};
use datafusion_common::alias::AliasGenerator;
use datafusion_common::config::{ConfigExtension, ConfigOptions, TableOptions};
use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
@@ -68,6 +68,7 @@ use
datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::optimizer::PhysicalOptimizer;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::ExecutionPlan;
+use datafusion_session::Session;
use datafusion_sql::parser::{DFParserBuilder, Statement};
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext,
SqlToRel};
diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs
index 803ec66eaa..4ed1726fc2 100644
--- a/datafusion/core/src/lib.rs
+++ b/datafusion/core/src/lib.rs
@@ -649,6 +649,8 @@
//!
//! * [datafusion_common]: Common traits and types
//! * [datafusion_catalog]: Catalog APIs such as [`SchemaProvider`] and
[`CatalogProvider`]
+//! * [datafusion_datasource]: File and Data IO such as [`FileSource`] and
[`DataSink`]
+//! * [datafusion_session]: [`Session`] and related structures
//! * [datafusion_execution]: State and structures needed for execution
//! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical
planning structure
//! * [datafusion_functions]: Scalar function packages
@@ -664,6 +666,9 @@
//!
//! [`SchemaProvider`]: datafusion_catalog::SchemaProvider
//! [`CatalogProvider`]: datafusion_catalog::CatalogProvider
+//! [`Session`]: datafusion_session::Session
+//! [`FileSource`]: datafusion_datasource::file::FileSource
+//! [`DataSink`]: datafusion_datasource::sink::DataSink
//!
//! ## Citing DataFusion in Academic Papers
//!
diff --git a/datafusion/datasource-avro/Cargo.toml
b/datafusion/datasource-avro/Cargo.toml
index e6bb2ef4d5..064f9f87ee 100644
--- a/datafusion/datasource-avro/Cargo.toml
+++ b/datafusion/datasource-avro/Cargo.toml
@@ -43,6 +43,7 @@ datafusion-execution = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
futures = { workspace = true }
num-traits = { version = "0.2" }
object_store = { workspace = true }
diff --git a/datafusion/datasource-avro/src/file_format.rs
b/datafusion/datasource-avro/src/file_format.rs
index 00a96121aa..779b610fe4 100644
--- a/datafusion/datasource-avro/src/file_format.rs
+++ b/datafusion/datasource-avro/src/file_format.rs
@@ -22,26 +22,26 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
-use datafusion_common::{Result, Statistics};
-use datafusion_datasource::file_compression_type::FileCompressionType;
-use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
+use crate::avro_to_arrow::read_avro_schema_from_reader;
+use crate::source::AvroSource;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
-use async_trait::async_trait;
-use datafusion_catalog::Session;
use datafusion_common::internal_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::GetExt;
use datafusion_common::DEFAULT_AVRO_EXTENSION;
+use datafusion_common::{Result, Statistics};
use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_compression_type::FileCompressionType;
+use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::ExecutionPlan;
-use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
+use datafusion_session::Session;
-use crate::avro_to_arrow::read_avro_schema_from_reader;
-use crate::source::AvroSource;
+use async_trait::async_trait;
+use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
#[derive(Default)]
/// Factory struct used to create [`AvroFormat`]
diff --git a/datafusion/datasource-csv/Cargo.toml
b/datafusion/datasource-csv/Cargo.toml
index b95c51cbbe..c9e4649bdc 100644
--- a/datafusion/datasource-csv/Cargo.toml
+++ b/datafusion/datasource-csv/Cargo.toml
@@ -43,6 +43,7 @@ datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
futures = { workspace = true }
object_store = { workspace = true }
regex = { workspace = true }
diff --git a/datafusion/datasource-csv/src/file_format.rs
b/datafusion/datasource-csv/src/file_format.rs
index 6cd6a83884..9896643106 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -22,11 +22,12 @@ use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug};
use std::sync::Arc;
+use crate::source::CsvSource;
+
use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use arrow::error::ArrowError;
-use datafusion_catalog::Session;
use datafusion_common::config::{ConfigField, ConfigFileType, CsvOptions};
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::{
@@ -43,6 +44,7 @@ use datafusion_datasource::file_format::{
};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
+use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
use datafusion_datasource::write::BatchSerializer;
@@ -50,18 +52,16 @@ use datafusion_execution::{SendableRecordBatchStream,
TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+use datafusion_session::Session;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
-use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
-use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
use regex::Regex;
-use crate::source::CsvSource;
-
#[derive(Default)]
/// Factory used to create [`CsvFormat`]
pub struct CsvFormatFactory {
diff --git a/datafusion/datasource-json/Cargo.toml
b/datafusion/datasource-json/Cargo.toml
index 78547c592b..6c74923ff7 100644
--- a/datafusion/datasource-json/Cargo.toml
+++ b/datafusion/datasource-json/Cargo.toml
@@ -43,6 +43,7 @@ datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
futures = { workspace = true }
object_store = { workspace = true }
serde_json = { workspace = true }
diff --git a/datafusion/datasource-json/src/file_format.rs
b/datafusion/datasource-json/src/file_format.rs
index 054c05b5b0..00019e83c4 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -24,12 +24,13 @@ use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;
+use crate::source::JsonSource;
+
use arrow::array::RecordBatch;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::json;
use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
-use datafusion_catalog::Session;
use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{
@@ -45,22 +46,21 @@ use datafusion_datasource::file_format::{
};
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
+use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
use datafusion_datasource::write::BatchSerializer;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
-use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+use datafusion_session::Session;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
-use datafusion_physical_expr_common::sort_expr::LexRequirement;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
-use crate::source::JsonSource;
-
#[derive(Default)]
/// Factory struct used to create [JsonFormat]
pub struct JsonFormatFactory {
diff --git a/datafusion/datasource-parquet/Cargo.toml
b/datafusion/datasource-parquet/Cargo.toml
index 8aa041b7a4..b6a548c998 100644
--- a/datafusion/datasource-parquet/Cargo.toml
+++ b/datafusion/datasource-parquet/Cargo.toml
@@ -45,6 +45,7 @@ datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
diff --git a/datafusion/datasource-parquet/src/file_format.rs
b/datafusion/datasource-parquet/src/file_format.rs
index b7a8712428..98ba24df97 100644
--- a/datafusion/datasource-parquet/src/file_format.rs
+++ b/datafusion/datasource-parquet/src/file_format.rs
@@ -23,20 +23,13 @@ use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
-use arrow::array::RecordBatch;
-use arrow::datatypes::{Fields, Schema, SchemaRef};
-use datafusion_datasource::file_compression_type::FileCompressionType;
-use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
-use datafusion_datasource::write::{create_writer, get_writer_schema,
SharedBuffer};
-
-use datafusion_datasource::file_format::{
- FileFormat, FileFormatFactory, FilePushdownSupport,
-};
-use datafusion_datasource::write::demux::DemuxedStreamReceiver;
+use crate::can_expr_be_pushed_down_with_schemas;
+use crate::source::ParquetSource;
+use arrow::array::RecordBatch;
use arrow::compute::sum;
use arrow::datatypes::{DataType, Field, FieldRef};
-use datafusion_catalog::Session;
+use arrow::datatypes::{Fields, Schema, SchemaRef};
use datafusion_common::config::{ConfigField, ConfigFileType,
TableParquetOptions};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
@@ -48,7 +41,15 @@ use datafusion_common::{HashMap, Statistics};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
+use datafusion_datasource::file_compression_type::FileCompressionType;
+use datafusion_datasource::file_format::{
+ FileFormat, FileFormatFactory, FilePushdownSupport,
+};
use datafusion_datasource::file_scan_config::FileScanConfig;
+use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
+use datafusion_datasource::sink::{DataSink, DataSinkExec};
+use datafusion_datasource::write::demux::DemuxedStreamReceiver;
+use datafusion_datasource::write::{create_writer, get_writer_schema,
SharedBuffer};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool,
MemoryReservation};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
@@ -57,11 +58,11 @@ use
datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::Accumulator;
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+use datafusion_session::Session;
use async_trait::async_trait;
use bytes::Bytes;
-use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
-use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryStreamExt};
use log::debug;
@@ -83,9 +84,6 @@ use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
-use crate::can_expr_be_pushed_down_with_schemas;
-use crate::source::ParquetSource;
-
/// Initial writing buffer size. Note this is just a size hint for efficiency.
It
/// will grow beyond the set value if needed.
const INITIAL_BUFFER_BYTES: usize = 1048576;
diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml
index 922b36aa5f..2132272b57 100644
--- a/datafusion/datasource/Cargo.toml
+++ b/datafusion/datasource/Cargo.toml
@@ -48,7 +48,6 @@ async-trait = { workspace = true }
bytes = { workspace = true }
bzip2 = { version = "0.5.2", optional = true }
chrono = { workspace = true }
-datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, features = ["object_store"] }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
@@ -56,6 +55,7 @@ datafusion-expr = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
+datafusion-session = { workspace = true }
flate2 = { version = "1.0.24", optional = true }
futures = { workspace = true }
glob = "0.3.0"
diff --git a/datafusion/datasource/src/file_format.rs
b/datafusion/datasource/src/file_format.rs
index aa0338fab7..0e0b7b12e1 100644
--- a/datafusion/datasource/src/file_format.rs
+++ b/datafusion/datasource/src/file_format.rs
@@ -29,14 +29,14 @@ use crate::file_scan_config::FileScanConfig;
use crate::file_sink_config::FileSinkConfig;
use arrow::datatypes::{Schema, SchemaRef};
-use async_trait::async_trait;
-use datafusion_catalog::Session;
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt, Result,
Statistics};
use datafusion_expr::Expr;
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::ExecutionPlan;
+use datafusion_session::Session;
+use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};
/// Default max records to scan to infer the schema
@@ -46,7 +46,7 @@ pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
/// from the [`TableProvider`]. This helps code re-utilization across
/// providers that support the same file formats.
///
-/// [`TableProvider`]: datafusion_catalog::TableProvider
+/// [`TableProvider`]:
https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
#[async_trait]
pub trait FileFormat: Send + Sync + fmt::Debug {
/// Returns the table provider as [`Any`](std::any::Any) so that it can be
diff --git a/datafusion/datasource/src/file_sink_config.rs
b/datafusion/datasource/src/file_sink_config.rs
index dbff28692c..465167fea9 100644
--- a/datafusion/datasource/src/file_sink_config.rs
+++ b/datafusion/datasource/src/file_sink_config.rs
@@ -15,19 +15,22 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use crate::file_groups::FileGroup;
+use crate::sink::DataSink;
use crate::write::demux::{start_demuxer_task, DemuxedStreamReceiver};
use crate::ListingTableUrl;
+
use arrow::datatypes::{DataType, SchemaRef};
-use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
-use datafusion_physical_plan::insert::DataSink;
+
+use async_trait::async_trait;
use object_store::ObjectStore;
-use std::sync::Arc;
/// General behaviors for files that do `DataSink` operations
#[async_trait]
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index 15e25ca386..fb119d1b3d 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -39,6 +39,7 @@ pub mod file_sink_config;
pub mod file_stream;
pub mod memory;
pub mod schema_adapter;
+pub mod sink;
pub mod source;
mod statistics;
diff --git a/datafusion/physical-plan/src/insert.rs
b/datafusion/datasource/src/sink.rs
similarity index 97%
rename from datafusion/physical-plan/src/insert.rs
rename to datafusion/datasource/src/sink.rs
index 5272f0ab18..0552370d8e 100644
--- a/datafusion/physical-plan/src/insert.rs
+++ b/datafusion/datasource/src/sink.rs
@@ -22,22 +22,22 @@ use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
-use super::{
+use datafusion_physical_plan::metrics::MetricsSet;
+use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion_physical_plan::ExecutionPlanProperties;
+use datafusion_physical_plan::{
execute_input_stream, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning,
PlanProperties, SendableRecordBatchStream,
};
-use crate::metrics::MetricsSet;
-use crate::stream::RecordBatchStreamAdapter;
-use crate::ExecutionPlanProperties;
use arrow::array::{ArrayRef, RecordBatch, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{internal_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{Distribution, EquivalenceProperties};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
use async_trait::async_trait;
-use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::StreamExt;
/// `DataSink` implements writing streams of [`RecordBatch`]es to
diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs
index 0667a043e2..2dbcfa2ef1 100644
--- a/datafusion/datasource/src/url.rs
+++ b/datafusion/datasource/src/url.rs
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion_catalog::Session;
+use std::sync::Arc;
+
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::object_store::ObjectStoreUrl;
+use datafusion_session::Session;
+
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
@@ -26,7 +29,6 @@ use log::debug;
use object_store::path::Path;
use object_store::path::DELIMITER;
use object_store::{ObjectMeta, ObjectStore};
-use std::sync::Arc;
use url::Url;
/// A parsed URL identifying files for a listing table, see
[`ListingTableUrl::parse`]
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index bc2017fb20..04fbd06fab 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -66,7 +66,6 @@ pub mod empty;
pub mod execution_plan;
pub mod explain;
pub mod filter;
-pub mod insert;
pub mod joins;
pub mod limit;
pub mod memory;
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 6562a9be45..14b169629f 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -18,9 +18,25 @@
use std::fmt::Debug;
use std::sync::Arc;
-use datafusion::physical_expr::aggregate::AggregateExprBuilder;
-use prost::bytes::BufMut;
-use prost::Message;
+use self::from_proto::parse_protobuf_partitioning;
+use self::to_proto::{serialize_partitioning, serialize_physical_expr};
+use crate::common::{byte_to_string, str_to_byte};
+use crate::physical_plan::from_proto::{
+ parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
+ parse_physical_window_expr, parse_protobuf_file_scan_config,
+ parse_protobuf_file_scan_schema,
+};
+use crate::physical_plan::to_proto::{
+ serialize_file_scan_config, serialize_maybe_filter,
serialize_physical_aggr_expr,
+ serialize_physical_window_expr,
+};
+use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
+use crate::protobuf::physical_expr_node::ExprType;
+use crate::protobuf::physical_plan_node::PhysicalPlanType;
+use crate::protobuf::{
+ self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
+};
+use crate::{convert_required, into_required};
use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::SchemaRef;
@@ -34,9 +50,11 @@ use datafusion::datasource::physical_plan::AvroSource;
#[cfg(feature = "parquet")]
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::datasource::physical_plan::{CsvSource, FileScanConfig,
JsonSource};
+use datafusion::datasource::sink::DataSinkExec;
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
+use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
use datafusion::physical_plan::aggregates::AggregateMode;
@@ -48,7 +66,6 @@ use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::insert::DataSinkExec;
use datafusion::physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use datafusion::physical_plan::joins::{
CrossJoinExec, NestedLoopJoinExec, StreamJoinPartitionMode,
SymmetricHashJoinExec,
@@ -70,26 +87,8 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_common::{internal_err, not_impl_err, DataFusionError, Result};
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
-use crate::common::{byte_to_string, str_to_byte};
-use crate::physical_plan::from_proto::{
- parse_physical_expr, parse_physical_sort_expr, parse_physical_sort_exprs,
- parse_physical_window_expr, parse_protobuf_file_scan_config,
- parse_protobuf_file_scan_schema,
-};
-use crate::physical_plan::to_proto::{
- serialize_file_scan_config, serialize_maybe_filter,
serialize_physical_aggr_expr,
- serialize_physical_window_expr,
-};
-use crate::protobuf::physical_aggregate_expr_node::AggregateFunction;
-use crate::protobuf::physical_expr_node::ExprType;
-use crate::protobuf::physical_plan_node::PhysicalPlanType;
-use crate::protobuf::{
- self, proto_error, window_agg_exec_node, ListUnnest as ProtoListUnnest,
-};
-use crate::{convert_required, into_required};
-
-use self::from_proto::parse_protobuf_partitioning;
-use self::to_proto::{serialize_partitioning, serialize_physical_expr};
+use prost::bytes::BufMut;
+use prost::Message;
pub mod from_proto;
pub mod to_proto;
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 57d10e38ee..1e5543c05b 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -21,6 +21,11 @@ use std::ops::Deref;
use std::sync::Arc;
use std::vec;
+use crate::cases::{
+ CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf,
+ MyRegexUdfNode,
+};
+
use arrow::array::RecordBatch;
use arrow::csv::WriterBuilder;
use arrow::datatypes::{Fields, TimeUnit};
@@ -32,10 +37,6 @@ use
datafusion_functions_aggregate::array_agg::array_agg_udaf;
use datafusion_functions_aggregate::min_max::max_udaf;
use prost::Message;
-use crate::cases::{
- CustomUDWF, CustomUDWFNode, MyAggregateUDF, MyAggregateUdfNode, MyRegexUdf,
- MyRegexUdfNode,
-};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::compute::kernels::sort::SortOptions;
use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema};
@@ -49,6 +50,7 @@ use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup,
FileScanConfig,
FileSinkConfig, FileSource, ParquetSource,
};
+use datafusion::datasource::sink::DataSinkExec;
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::functions_window::nth_value::nth_value_udwf;
@@ -68,7 +70,6 @@ use datafusion::physical_plan::expressions::{
binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr,
PhysicalSortExpr,
};
use datafusion::physical_plan::filter::FilterExec;
-use datafusion::physical_plan::insert::DataSinkExec;
use datafusion::physical_plan::joins::{
HashJoinExec, NestedLoopJoinExec, PartitionMode, StreamJoinPartitionMode,
};
diff --git a/datafusion/catalog/Cargo.toml b/datafusion/session/Cargo.toml
similarity index 96%
copy from datafusion/catalog/Cargo.toml
copy to datafusion/session/Cargo.toml
index 113d688253..c6e268735a 100644
--- a/datafusion/catalog/Cargo.toml
+++ b/datafusion/session/Cargo.toml
@@ -16,8 +16,8 @@
# under the License.
[package]
-name = "datafusion-catalog"
-description = "datafusion-catalog"
+name = "datafusion-session"
+description = "datafusion-session"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
diff --git a/datafusion/session/LICENSE.txt b/datafusion/session/LICENSE.txt
new file mode 120000
index 0000000000..1ef648f64b
--- /dev/null
+++ b/datafusion/session/LICENSE.txt
@@ -0,0 +1 @@
+../../LICENSE.txt
\ No newline at end of file
diff --git a/datafusion/session/NOTICE.txt b/datafusion/session/NOTICE.txt
new file mode 120000
index 0000000000..fb051c92b1
--- /dev/null
+++ b/datafusion/session/NOTICE.txt
@@ -0,0 +1 @@
+../../NOTICE.txt
\ No newline at end of file
diff --git a/datafusion/session/README.md b/datafusion/session/README.md
new file mode 100644
index 0000000000..019f9f8892
--- /dev/null
+++ b/datafusion/session/README.md
@@ -0,0 +1,26 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion Session
+
+[DataFusion][df] is an extensible query execution framework, written in Rust,
that uses Apache Arrow as its in-memory format.
+
+This crate provides **session-related abstractions** used in the DataFusion
query engine. A _session_ represents the runtime context for query execution,
including configuration, runtime environment, function registry, and planning.
+
+[df]: https://crates.io/crates/datafusion
diff --git a/datafusion/session/src/lib.rs b/datafusion/session/src/lib.rs
new file mode 100644
index 0000000000..a2e1d9ca3a
--- /dev/null
+++ b/datafusion/session/src/lib.rs
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Session management for DataFusion query execution environment
+//!
+//! This module provides the core session management functionality for
DataFusion,
+//! handling both Catalog (Table) and Datasource (File) configurations. It
defines
+//! the fundamental interfaces and implementations for maintaining query
execution
+//! state and configurations.
+//!
+//! Key components:
+//! * [`Session`] - Manages query execution context, including configurations,
+//! catalogs, and runtime state
+//! * [`SessionStore`] - Handles session persistence and retrieval
+//!
+//! The session system enables:
+//! * Configuration management for query execution
+//! * Catalog and schema management
+//! * Function registry access
+//! * Runtime environment configuration
+//! * Query state persistence
+
+pub mod session;
+
+pub use crate::session::{Session, SessionStore};
diff --git a/datafusion/catalog/src/session.rs
b/datafusion/session/src/session.rs
similarity index 96%
rename from datafusion/catalog/src/session.rs
rename to datafusion/session/src/session.rs
index 88b9669cff..de23dba491 100644
--- a/datafusion/catalog/src/session.rs
+++ b/datafusion/session/src/session.rs
@@ -29,7 +29,7 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
-/// Interface for accessing [`SessionState`] from the catalog.
+/// Interface for accessing [`SessionState`] from the catalog and data source.
///
/// This trait provides access to the information needed to plan and execute
/// queries, such as configuration, functions, and runtime environment. See the
@@ -51,7 +51,7 @@ use std::sync::{Arc, Weak};
/// the required information.
///
/// ```
-/// # use datafusion_catalog::Session;
+/// # use datafusion_session::Session;
/// # use datafusion_common::{Result, exec_datafusion_err};
/// # struct SessionState {}
/// // Given a `Session` reference, get the concrete `SessionState` reference
@@ -64,7 +64,7 @@ use std::sync::{Arc, Weak};
/// ```
///
/// [`SessionState`]:
https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionState.html
-/// [`TableProvider`]: crate::TableProvider
+/// [`TableProvider`]:
https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html
#[async_trait]
pub trait Session: Send + Sync {
/// Return the session ID
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]