This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new d2f33820 Upgrade DataFusion to 24.0.0 (#769)
d2f33820 is described below
commit d2f33820c108f63cd3a31e425bc15c7e9fec255b
Author: r.4ntix <[email protected]>
AuthorDate: Sun May 14 22:27:08 2023 +0800
Upgrade DataFusion to 24.0.0 (#769)
---
Cargo.toml | 17 +++++++----------
ballista/client/src/context.rs | 28 ++++++++++++++++------------
ballista/core/src/utils.rs | 4 ++--
ballista/scheduler/src/flight_sql.rs | 27 +++++++++++++++++++++++++--
4 files changed, 50 insertions(+), 26 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 39233942..86292246 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,20 +19,17 @@
members = ["ballista-cli", "ballista/client", "ballista/core",
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
[workspace.dependencies]
-arrow = { version = "37.0.0" }
-arrow-flight = { version = "37.0.0", features = ["flight-sql-experimental"] }
+arrow = { version = "38.0.0" }
+arrow-flight = { version = "38.0.0", features = ["flight-sql-experimental"] }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
-datafusion = "23.0.0"
-datafusion-cli = "23.0.0"
-datafusion-proto = "23.0.0"
-object_store = "0.5.4"
+datafusion = "24.0.0"
+datafusion-cli = "24.0.0"
+datafusion-proto = "24.0.0"
+object_store = "0.5.6"
sqlparser = "0.33.0"
tonic = { version = "0.9" }
-tonic-build = { version = "0.9", default-features = false, features = [
- "transport",
- "prost",
-] }
+tonic-build = { version = "0.9", default-features = false, features =
["transport", "prost"] }
# cargo build --profile release-lto
[profile.release-lto]
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index dea233d7..99b7d9b1 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -38,7 +38,9 @@ use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::{source_as_provider, TableProvider};
use datafusion::error::{DataFusionError, Result};
-use datafusion::logical_expr::{CreateExternalTable, LogicalPlan, TableScan};
+use datafusion::logical_expr::{
+ CreateExternalTable, DdlStatement, LogicalPlan, TableScan,
+};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
SessionConfig, SessionContext,
@@ -389,17 +391,19 @@ impl BallistaContext {
let plan = ctx.state().create_logical_plan(sql).await?;
match plan {
- LogicalPlan::CreateExternalTable(CreateExternalTable {
- ref schema,
- ref name,
- ref location,
- ref file_type,
- ref has_header,
- ref delimiter,
- ref table_partition_cols,
- ref if_not_exists,
- ..
- }) => {
+ LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
+ CreateExternalTable {
+ ref schema,
+ ref name,
+ ref location,
+ ref file_type,
+ ref has_header,
+ ref delimiter,
+ ref table_partition_cols,
+ ref if_not_exists,
+ ..
+ },
+ )) => {
let table_exists = ctx.table_exist(name)?;
let schema: SchemaRef =
Arc::new(schema.as_ref().to_owned().into());
let table_partition_cols = table_partition_cols
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 6745b529..7c02ea02 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -32,7 +32,7 @@ use datafusion::execution::context::{
QueryPlanner, SessionConfig, SessionContext, SessionState,
};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::logical_expr::LogicalPlan;
+use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -415,7 +415,7 @@ impl<T: 'static + AsLogicalPlan> QueryPlanner for
BallistaQueryPlanner<T> {
session_state: &SessionState,
) -> std::result::Result<Arc<dyn ExecutionPlan>, DataFusionError> {
match logical_plan {
- LogicalPlan::CreateExternalTable(_) => {
+ LogicalPlan::Ddl(DdlStatement::CreateExternalTable(_)) => {
// table state is managed locally in the BallistaContext, not
in the scheduler
Ok(Arc::new(EmptyExec::new(false, Arc::new(Schema::empty()))))
}
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index ca755fd8..db13b518 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -23,8 +23,9 @@ use arrow_flight::sql::{
ActionCreatePreparedStatementResult, CommandGetCatalogs,
CommandGetCrossReference,
CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables,
- CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery,
- CommandStatementUpdate, SqlInfo, TicketStatementQuery,
+ CommandGetXdbcTypeInfo, CommandPreparedStatementQuery,
+ CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementUpdate,
+ SqlInfo, TicketStatementQuery,
};
use arrow_flight::{
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest,
@@ -597,6 +598,16 @@ impl FlightSqlService for FlightSqlServiceImpl {
Ok(Response::new(Box::pin(stream)))
}
+ /// Get a FlightDataStream containing the data related to the supported
XDBC types.
+ async fn do_get_xdbc_type_info(
+ &self,
+ _query: CommandGetXdbcTypeInfo,
+ _request: Request<Ticket>,
+ ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
+ debug!("do_get_xdbc_type_info");
+ Err(Status::unimplemented("Implement do_get_xdbc_type_info"))
+ }
+
async fn get_flight_info_statement(
&self,
query: CommandStatementQuery,
@@ -721,6 +732,18 @@ impl FlightSqlService for FlightSqlServiceImpl {
))
}
+ /// Get a FlightInfo to extract information about the supported XDBC types.
+ async fn get_flight_info_xdbc_type_info(
+ &self,
+ _query: CommandGetXdbcTypeInfo,
+ _request: Request<FlightDescriptor>,
+ ) -> Result<Response<FlightInfo>, Status> {
+ debug!("get_flight_info_xdbc_type_info");
+ Err(Status::unimplemented(
+ "Implement get_flight_info_xdbc_type_info",
+ ))
+ }
+
async fn do_get_statement(
&self,
_ticket: TicketStatementQuery,