This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 56df088  [DataFrame] - Add repartition funcation for dataframe (#37)
56df088 is described below

commit 56df088e56db7eca4a789d7fa38da31fe4ebd886
Author: Francis Du <[email protected]>
AuthorDate: Thu Sep 8 11:12:21 2022 +0800

    [DataFrame] - Add repartition funcation for dataframe (#37)
    
    * feat: Add repartition funcation for dataframe
    
    * fix: python linter
---
 datafusion/tests/test_dataframe.py |  8 ++++++++
 src/dataframe.rs                   | 15 +++++++++++++++
 2 files changed, 23 insertions(+)

diff --git a/datafusion/tests/test_dataframe.py 
b/datafusion/tests/test_dataframe.py
index 30506a7..e739465 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -236,3 +236,11 @@ def test_explain(df):
         column("a") - column("b"),
     )
     df.explain()
+
+
+def test_repartition(df):
+    df.repartition(2)
+
+
+def test_repartition_by_hash(df):
+    df.repartition_by_hash(column("a"), num=2)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 87f38c1..f9f2f66 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -22,6 +22,7 @@ use datafusion::arrow::pyarrow::PyArrowConvert;
 use datafusion::arrow::util::pretty;
 use datafusion::dataframe::DataFrame;
 use datafusion::logical_plan::JoinType;
+use datafusion::prelude::*;
 use pyo3::exceptions::PyTypeError;
 use pyo3::prelude::*;
 use pyo3::types::PyTuple;
@@ -170,4 +171,18 @@ impl PyDataFrame {
         let batches = wait_for_future(py, df.collect())?;
         Ok(pretty::print_batches(&batches)?)
     }
+
+    /// Repartition a `DataFrame` based on a logical partitioning scheme.
+    fn repartition(&self, num: usize) -> PyResult<Self> {
+        let new_df = self.df.repartition(Partitioning::RoundRobinBatch(num))?;
+        Ok(Self::new(new_df))
+    }
+
+    /// Repartition a `DataFrame` based on a logical partitioning scheme.
+    #[args(args = "*", num)]
+    fn repartition_by_hash(&self, args: Vec<PyExpr>, num: usize) -> 
PyResult<Self> {
+        let expr = args.into_iter().map(|py_expr| py_expr.into()).collect();
+        let new_df = self.df.repartition(Partitioning::Hash(expr, num))?;
+        Ok(Self::new(new_df))
+    }
 }

Reply via email to