This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion-python.git
The following commit(s) were added to refs/heads/master by this push:
new 56df088 [DataFrame] - Add repartition funcation for dataframe (#37)
56df088 is described below
commit 56df088e56db7eca4a789d7fa38da31fe4ebd886
Author: Francis Du <[email protected]>
AuthorDate: Thu Sep 8 11:12:21 2022 +0800
[DataFrame] - Add repartition funcation for dataframe (#37)
* feat: Add repartition funcation for dataframe
* fix: python linter
---
datafusion/tests/test_dataframe.py | 8 ++++++++
src/dataframe.rs | 15 +++++++++++++++
2 files changed, 23 insertions(+)
diff --git a/datafusion/tests/test_dataframe.py
b/datafusion/tests/test_dataframe.py
index 30506a7..e739465 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -236,3 +236,11 @@ def test_explain(df):
column("a") - column("b"),
)
df.explain()
+
+
+def test_repartition(df):
+ df.repartition(2)
+
+
+def test_repartition_by_hash(df):
+ df.repartition_by_hash(column("a"), num=2)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 87f38c1..f9f2f66 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -22,6 +22,7 @@ use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::util::pretty;
use datafusion::dataframe::DataFrame;
use datafusion::logical_plan::JoinType;
+use datafusion::prelude::*;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::types::PyTuple;
@@ -170,4 +171,18 @@ impl PyDataFrame {
let batches = wait_for_future(py, df.collect())?;
Ok(pretty::print_batches(&batches)?)
}
+
+ /// Repartition a `DataFrame` based on a logical partitioning scheme.
+ fn repartition(&self, num: usize) -> PyResult<Self> {
+ let new_df = self.df.repartition(Partitioning::RoundRobinBatch(num))?;
+ Ok(Self::new(new_df))
+ }
+
+ /// Repartition a `DataFrame` based on a logical partitioning scheme.
+ #[args(args = "*", num)]
+ fn repartition_by_hash(&self, args: Vec<PyExpr>, num: usize) ->
PyResult<Self> {
+ let expr = args.into_iter().map(|py_expr| py_expr.into()).collect();
+ let new_df = self.df.repartition(Partitioning::Hash(expr, num))?;
+ Ok(Self::new(new_df))
+ }
}