This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f530dd198 Update substrait requirement from 0.49 to 0.50 (#13808)
9f530dd198 is described below

commit 9f530dd1987d863dd4de99198df5b2c3e242bf6e
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 19 08:37:43 2024 -0500

    Update substrait requirement from 0.49 to 0.50 (#13808)
    
    * Update substrait requirement from 0.49 to 0.50
    
    Updates the requirements on 
[substrait](https://github.com/substrait-io/substrait-rs) to permit the latest 
version.
    - [Release notes](https://github.com/substrait-io/substrait-rs/releases)
    - 
[Changelog](https://github.com/substrait-io/substrait-rs/blob/main/CHANGELOG.md)
    - 
[Commits](https://github.com/substrait-io/substrait-rs/compare/v0.49.0...v0.50.0)
    
    ---
    updated-dependencies:
    - dependency-name: substrait
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <[email protected]>
    
    * Fix compilation
    
    * Add expr test
    
    ---------
    
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    Co-authored-by: jonahgao <[email protected]>
---
 datafusion/substrait/Cargo.toml                    |  2 +-
 datafusion/substrait/src/logical_plan/consumer.rs  | 31 ++++++++++++-----
 datafusion/substrait/src/logical_plan/producer.rs  | 39 ++++++++++++++--------
 .../tests/cases/roundtrip_logical_plan.rs          |  9 +++--
 4 files changed, 54 insertions(+), 27 deletions(-)

diff --git a/datafusion/substrait/Cargo.toml b/datafusion/substrait/Cargo.toml
index ab4d386f05..09c6e0351e 100644
--- a/datafusion/substrait/Cargo.toml
+++ b/datafusion/substrait/Cargo.toml
@@ -42,7 +42,7 @@ object_store = { workspace = true }
 pbjson-types = "0.7"
 # TODO use workspace version
 prost = "0.13"
-substrait = { version = "0.49", features = ["serde"] }
+substrait = { version = "0.50", features = ["serde"] }
 url = { workspace = true }
 
 [dev-dependencies]
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs 
b/datafusion/substrait/src/logical_plan/consumer.rs
index a9e411e35a..9f98fdace6 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -62,7 +62,7 @@ use datafusion::logical_expr::{
     col, expr, Cast, Extension, GroupingSet, Like, LogicalPlanBuilder, 
Partitioning,
     Repartition, Subquery, WindowFrameBound, WindowFrameUnits, 
WindowFunctionDefinition,
 };
-use datafusion::prelude::JoinType;
+use datafusion::prelude::{lit, JoinType};
 use datafusion::sql::TableReference;
 use datafusion::{
     error::Result, logical_expr::utils::split_conjunction, prelude::Column,
@@ -98,7 +98,7 @@ use substrait::proto::{
     sort_field::{SortDirection, SortKind::*},
     AggregateFunction, Expression, NamedStruct, Plan, Rel, RelCommon, Type,
 };
-use substrait::proto::{ExtendedExpression, FunctionArgument, SortField};
+use substrait::proto::{fetch_rel, ExtendedExpression, FunctionArgument, 
SortField};
 
 use super::state::SubstraitPlanningState;
 
@@ -640,14 +640,27 @@ pub async fn from_substrait_rel(
                 let input = LogicalPlanBuilder::from(
                     from_substrait_rel(state, input, extensions).await?,
                 );
-                let offset = fetch.offset as usize;
-                // -1 means that ALL records should be returned
-                let count = if fetch.count == -1 {
-                    None
-                } else {
-                    Some(fetch.count as usize)
+                let empty_schema = DFSchemaRef::new(DFSchema::empty());
+                let offset = match &fetch.offset_mode {
+                    Some(fetch_rel::OffsetMode::Offset(offset)) => 
Some(lit(*offset)),
+                    Some(fetch_rel::OffsetMode::OffsetExpr(expr)) => Some(
+                        from_substrait_rex(state, expr, &empty_schema, 
extensions)
+                            .await?,
+                    ),
+                    None => None,
+                };
+                let count = match &fetch.count_mode {
+                    Some(fetch_rel::CountMode::Count(count)) => {
+                        // -1 means that ALL records should be returned, 
equivalent to None
+                        (*count != -1).then(|| lit(*count))
+                    }
+                    Some(fetch_rel::CountMode::CountExpr(expr)) => Some(
+                        from_substrait_rex(state, expr, &empty_schema, 
extensions)
+                            .await?,
+                    ),
+                    None => None,
                 };
-                input.limit(offset, count)?.build()
+                input.limit_by_expr(offset, count)?.build()
             } else {
                 not_impl_err!("Fetch without an input is not valid")
             }
diff --git a/datafusion/substrait/src/logical_plan/producer.rs 
b/datafusion/substrait/src/logical_plan/producer.rs
index a128b90e68..375cb734f5 100644
--- a/datafusion/substrait/src/logical_plan/producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer.rs
@@ -22,9 +22,7 @@ use std::sync::Arc;
 use substrait::proto::expression_reference::ExprType;
 
 use datafusion::arrow::datatypes::{Field, IntervalUnit};
-use datafusion::logical_expr::{
-    Distinct, FetchType, Like, Partitioning, SkipType, TryCast, 
WindowFrameUnits,
-};
+use datafusion::logical_expr::{Distinct, Like, Partitioning, TryCast, 
WindowFrameUnits};
 use datafusion::{
     arrow::datatypes::{DataType, TimeUnit},
     error::{DataFusionError, Result},
@@ -45,7 +43,7 @@ use datafusion::arrow::array::{Array, GenericListArray, 
OffsetSizeTrait};
 use datafusion::arrow::temporal_conversions::NANOSECONDS;
 use datafusion::common::{
     exec_err, internal_err, not_impl_err, plan_err, substrait_datafusion_err,
-    substrait_err, DFSchemaRef, ToDFSchema,
+    substrait_err, DFSchema, DFSchemaRef, ToDFSchema,
 };
 #[allow(unused_imports)]
 use datafusion::logical_expr::expr::{
@@ -69,7 +67,8 @@ use substrait::proto::read_rel::VirtualTable;
 use substrait::proto::rel_common::EmitKind;
 use substrait::proto::rel_common::EmitKind::Emit;
 use substrait::proto::{
-    rel_common, ExchangeRel, ExpressionReference, ExtendedExpression, 
RelCommon,
+    fetch_rel, rel_common, ExchangeRel, ExpressionReference, 
ExtendedExpression,
+    RelCommon,
 };
 use substrait::{
     proto::{
@@ -333,19 +332,31 @@ pub fn to_substrait_rel(
         }
         LogicalPlan::Limit(limit) => {
             let input = to_substrait_rel(limit.input.as_ref(), state, 
extensions)?;
-            let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
-                return not_impl_err!("Non-literal limit fetch");
-            };
-            let SkipType::Literal(skip) = limit.get_skip_type()? else {
-                return not_impl_err!("Non-literal limit skip");
-            };
+            let empty_schema = Arc::new(DFSchema::empty());
+            let offset_mode = limit
+                .skip
+                .as_ref()
+                .map(|expr| {
+                    to_substrait_rex(state, expr.as_ref(), &empty_schema, 0, 
extensions)
+                })
+                .transpose()?
+                .map(Box::new)
+                .map(fetch_rel::OffsetMode::OffsetExpr);
+            let count_mode = limit
+                .fetch
+                .as_ref()
+                .map(|expr| {
+                    to_substrait_rex(state, expr.as_ref(), &empty_schema, 0, 
extensions)
+                })
+                .transpose()?
+                .map(Box::new)
+                .map(fetch_rel::CountMode::CountExpr);
             Ok(Box::new(Rel {
                 rel_type: Some(RelType::Fetch(Box::new(FetchRel {
                     common: None,
                     input: Some(input),
-                    offset: skip as i64,
-                    // use -1 to signal that ALL records should be returned
-                    count: fetch.map(|f| f as i64).unwrap_or(-1),
+                    offset_mode,
+                    count_mode,
                     advanced_extension: None,
                 }))),
             }))
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index f836dea03c..1291bbd6a2 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -240,17 +240,20 @@ async fn select_with_filter_bool_expr() -> Result<()> {
 
 #[tokio::test]
 async fn select_with_limit() -> Result<()> {
-    roundtrip_fill_na("SELECT * FROM data LIMIT 100").await
+    roundtrip_fill_na("SELECT * FROM data LIMIT 100").await?;
+    roundtrip_fill_na("SELECT * FROM data LIMIT 98+100/50").await
 }
 
 #[tokio::test]
 async fn select_without_limit() -> Result<()> {
-    roundtrip_fill_na("SELECT * FROM data OFFSET 10").await
+    roundtrip_fill_na("SELECT * FROM data OFFSET 10").await?;
+    roundtrip_fill_na("SELECT * FROM data OFFSET 5+7-2").await
 }
 
 #[tokio::test]
 async fn select_with_limit_offset() -> Result<()> {
-    roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await
+    roundtrip("SELECT * FROM data LIMIT 200 OFFSET 10").await?;
+    roundtrip("SELECT * FROM data LIMIT 100+100 OFFSET 20/2").await
 }
 
 #[tokio::test]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to