This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7093bbdff0 feat(substrait): add intersect support to consumer (#12830)
7093bbdff0 is described below

commit 7093bbdff055e165c4640a0960aa0f1018368e23
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Fri Oct 11 00:29:31 2024 +0400

    feat(substrait): add intersect support to consumer (#12830)
---
 datafusion/substrait/src/logical_plan/consumer.rs  |  11 ++
 .../tests/cases/roundtrip_logical_plan.rs          |  27 +++++
 .../testdata/test_plans/intersect.substrait.json   | 118 +++++++++++++++++++++
 3 files changed, 156 insertions(+)

diff --git a/datafusion/substrait/src/logical_plan/consumer.rs 
b/datafusion/substrait/src/logical_plan/consumer.rs
index 030536f9f8..8884807749 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -887,6 +887,17 @@ pub async fn from_substrait_rel(
                         not_impl_err!("Union relation requires at least one 
input")
                     }
                 }
+                set_rel::SetOp::IntersectionPrimary => {
+                    if set.inputs.len() == 2 {
+                        LogicalPlanBuilder::intersect(
+                            from_substrait_rel(ctx, &set.inputs[0], 
extensions).await?,
+                            from_substrait_rel(ctx, &set.inputs[1], 
extensions).await?,
+                            false,
+                        )
+                    } else {
+                        not_impl_err!("Primary Intersect relation with more 
than two inputs isn't supported")
+                    }
+                }
                 _ => not_impl_err!("Unsupported set operator: {set_op:?}"),
             },
             Err(e) => not_impl_err!("Invalid set operation type {}: {e}", 
set.op),
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index ce6d1825cd..80caaafad6 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::utils::test::read_json;
 use datafusion::arrow::array::ArrayRef;
 use datafusion::physical_plan::Accumulator;
 use datafusion::scalar::ScalarValue;
@@ -663,6 +664,17 @@ async fn simple_intersect() -> Result<()> {
         .await
 }
 
+#[tokio::test]
+async fn simple_intersect_consume() -> Result<()> {
+    let proto_plan = 
read_json("tests/testdata/test_plans/intersect.substrait.json");
+
+    assert_substrait_sql(
+        proto_plan,
+        "SELECT a FROM data INTERSECT SELECT a FROM data2",
+    )
+    .await
+}
+
 #[tokio::test]
 async fn simple_intersect_table_reuse() -> Result<()> {
     // Substrait does currently NOT maintain the alias of the tables.
@@ -1111,6 +1123,21 @@ async fn assert_expected_plan(
     Ok(())
 }
 
+async fn assert_substrait_sql(substrait_plan: Plan, sql: &str) -> Result<()> {
+    let ctx = create_context().await?;
+
+    let expected = ctx.sql(sql).await?.into_optimized_plan()?;
+
+    let plan = from_substrait_plan(&ctx, &substrait_plan).await?;
+    let plan = ctx.state().optimize(&plan)?;
+
+    let planstr = format!("{plan}");
+    let expectedstr = format!("{expected}");
+    assert_eq!(planstr, expectedstr);
+
+    Ok(())
+}
+
 async fn roundtrip_fill_na(sql: &str) -> Result<()> {
     let ctx = create_context().await?;
     let df = ctx.sql(sql).await?;
diff --git 
a/datafusion/substrait/tests/testdata/test_plans/intersect.substrait.json 
b/datafusion/substrait/tests/testdata/test_plans/intersect.substrait.json
new file mode 100644
index 0000000000..b9a2e4ad14
--- /dev/null
+++ b/datafusion/substrait/tests/testdata/test_plans/intersect.substrait.json
@@ -0,0 +1,118 @@
+{
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "set": {
+            "inputs": [
+              {
+                "project": {
+                  "common": {
+                    "emit": {
+                      "outputMapping": [
+                        1
+                      ]
+                    }
+                  },
+                  "input": {
+                    "read": {
+                      "common": {
+                        "direct": {}
+                      },
+                      "baseSchema": {
+                        "names": [
+                          "a"
+                        ],
+                        "struct": {
+                          "types": [
+                            {
+                              "i64": {
+                                "nullability": "NULLABILITY_NULLABLE"
+                              }
+                            }
+                          ],
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      "namedTable": {
+                        "names": [
+                          "data"
+                        ]
+                      }
+                    }
+                  },
+                  "expressions": [
+                    {
+                      "selection": {
+                        "directReference": {
+                          "structField": {}
+                        },
+                        "rootReference": {}
+                      }
+                    }
+                  ]
+                }
+              },
+              {
+                "project": {
+                  "common": {
+                    "emit": {
+                      "outputMapping": [
+                        1
+                      ]
+                    }
+                  },
+                  "input": {
+                    "read": {
+                      "common": {
+                        "direct": {}
+                      },
+                      "baseSchema": {
+                        "names": [
+                          "a"
+                        ],
+                        "struct": {
+                          "types": [
+                            {
+                              "i64": {
+                                "nullability": "NULLABILITY_NULLABLE"
+                              }
+                            }
+                          ],
+                          "nullability": "NULLABILITY_NULLABLE"
+                        }
+                      },
+                      "namedTable": {
+                        "names": [
+                          "data2"
+                        ]
+                      }
+                    }
+                  },
+                  "expressions": [
+                    {
+                      "selection": {
+                        "directReference": {
+                          "structField": {}
+                        },
+                        "rootReference": {}
+                      }
+                    }
+                  ]
+                }
+              }
+            ],
+            "op": "SET_OP_INTERSECTION_PRIMARY"
+          }
+        },
+        "names": [
+          "a"
+        ]
+      }
+    }
+  ],
+  "version": {
+    "minorNumber": 54,
+    "producer": "subframe"
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to