This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7093bbdff0 feat(substrait): add intersect support to consumer (#12830)
7093bbdff0 is described below
commit 7093bbdff055e165c4640a0960aa0f1018368e23
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Fri Oct 11 00:29:31 2024 +0400
feat(substrait): add intersect support to consumer (#12830)
---
datafusion/substrait/src/logical_plan/consumer.rs | 11 ++
.../tests/cases/roundtrip_logical_plan.rs | 27 +++++
.../testdata/test_plans/intersect.substrait.json | 118 +++++++++++++++++++++
3 files changed, 156 insertions(+)
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index 030536f9f8..8884807749 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -887,6 +887,17 @@ pub async fn from_substrait_rel(
not_impl_err!("Union relation requires at least one
input")
}
}
+ set_rel::SetOp::IntersectionPrimary => {
+ if set.inputs.len() == 2 {
+ LogicalPlanBuilder::intersect(
+ from_substrait_rel(ctx, &set.inputs[0],
extensions).await?,
+ from_substrait_rel(ctx, &set.inputs[1],
extensions).await?,
+ false,
+ )
+ } else {
+ not_impl_err!("Primary Intersect relation with more
than two inputs isn't supported")
+ }
+ }
_ => not_impl_err!("Unsupported set operator: {set_op:?}"),
},
Err(e) => not_impl_err!("Invalid set operation type {}: {e}",
set.op),
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index ce6d1825cd..80caaafad6 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::utils::test::read_json;
use datafusion::arrow::array::ArrayRef;
use datafusion::physical_plan::Accumulator;
use datafusion::scalar::ScalarValue;
@@ -663,6 +664,17 @@ async fn simple_intersect() -> Result<()> {
.await
}
+#[tokio::test]
+async fn simple_intersect_consume() -> Result<()> {
+ let proto_plan =
read_json("tests/testdata/test_plans/intersect.substrait.json");
+
+ assert_substrait_sql(
+ proto_plan,
+ "SELECT a FROM data INTERSECT SELECT a FROM data2",
+ )
+ .await
+}
+
#[tokio::test]
async fn simple_intersect_table_reuse() -> Result<()> {
// Substrait does currently NOT maintain the alias of the tables.
@@ -1111,6 +1123,21 @@ async fn assert_expected_plan(
Ok(())
}
+async fn assert_substrait_sql(substrait_plan: Plan, sql: &str) -> Result<()> {
+ let ctx = create_context().await?;
+
+ let expected = ctx.sql(sql).await?.into_optimized_plan()?;
+
+ let plan = from_substrait_plan(&ctx, &substrait_plan).await?;
+ let plan = ctx.state().optimize(&plan)?;
+
+ let planstr = format!("{plan}");
+ let expectedstr = format!("{expected}");
+ assert_eq!(planstr, expectedstr);
+
+ Ok(())
+}
+
async fn roundtrip_fill_na(sql: &str) -> Result<()> {
let ctx = create_context().await?;
let df = ctx.sql(sql).await?;
diff --git
a/datafusion/substrait/tests/testdata/test_plans/intersect.substrait.json
b/datafusion/substrait/tests/testdata/test_plans/intersect.substrait.json
new file mode 100644
index 0000000000..b9a2e4ad14
--- /dev/null
+++ b/datafusion/substrait/tests/testdata/test_plans/intersect.substrait.json
@@ -0,0 +1,118 @@
+{
+ "relations": [
+ {
+ "root": {
+ "input": {
+ "set": {
+ "inputs": [
+ {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [
+ 1
+ ]
+ }
+ },
+ "input": {
+ "read": {
+ "common": {
+ "direct": {}
+ },
+ "baseSchema": {
+ "names": [
+ "a"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ],
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ "namedTable": {
+ "names": [
+ "data"
+ ]
+ }
+ }
+ },
+ "expressions": [
+ {
+ "selection": {
+ "directReference": {
+ "structField": {}
+ },
+ "rootReference": {}
+ }
+ }
+ ]
+ }
+ },
+ {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [
+ 1
+ ]
+ }
+ },
+ "input": {
+ "read": {
+ "common": {
+ "direct": {}
+ },
+ "baseSchema": {
+ "names": [
+ "a"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ],
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ "namedTable": {
+ "names": [
+ "data2"
+ ]
+ }
+ }
+ },
+ "expressions": [
+ {
+ "selection": {
+ "directReference": {
+ "structField": {}
+ },
+ "rootReference": {}
+ }
+ }
+ ]
+ }
+ }
+ ],
+ "op": "SET_OP_INTERSECTION_PRIMARY"
+ }
+ },
+ "names": [
+ "a"
+ ]
+ }
+ }
+ ],
+ "version": {
+ "minorNumber": 54,
+ "producer": "subframe"
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]