houqp commented on a change in pull request #873:
URL: https://github.com/apache/arrow-datafusion/pull/873#discussion_r737069089
##########
File path: datafusion/src/execution/context.rs
##########
@@ -565,6 +573,7 @@ impl ExecutionContext {
/// register_table function.
///
/// Returns an error if no table has been registered with the provided
reference.
+ /// NOTE(kszucs): perhaps it should be called dataframe() instead?
Review comment:
I think naming it dataframe makes sense if it's the only method that's
returning a Dataframe trait object, but this doesn't seem to be the case here.
##########
File path: python/src/errors.rs
##########
@@ -16,10 +16,11 @@
// under the License.
use core::fmt;
+//use std::result::Result;
Review comment:
this can be deleted if not needed anymore?
##########
File path: python/src/functions.rs
##########
@@ -15,232 +15,210 @@
// specific language governing permissions and limitations
// under the License.
-use crate::udaf;
-use crate::udf;
-use crate::{expression, types::PyDataType};
-use datafusion::arrow::datatypes::DataType;
-use datafusion::logical_plan::{self, Literal};
-use datafusion::physical_plan::functions::Volatility;
-use pyo3::{prelude::*, types::PyTuple, wrap_pyfunction, Python};
use std::sync::Arc;
-/// Expression representing a column on the existing plan.
-#[pyfunction]
-#[pyo3(text_signature = "(name)")]
-fn col(name: &str) -> expression::Expression {
- expression::Expression {
- expr: logical_plan::col(name),
- }
-}
-
-/// # A bridge type that converts PyAny data into datafusion literal
-///
-/// Note that the ordering here matters because it has to be from
-/// narrow to wider values because Python has duck typing so putting
-/// Int before Boolean results in a premature match.
-#[derive(FromPyObject)]
-enum PythonLiteral<'a> {
- Boolean(bool),
- Int(i64),
- UInt(u64),
- Float(f64),
- Str(&'a str),
- Binary(&'a [u8]),
-}
+use pyo3::{prelude::*, wrap_pyfunction, Python};
-impl<'a> Literal for PythonLiteral<'a> {
- fn lit(&self) -> logical_plan::Expr {
- match self {
- PythonLiteral::Boolean(val) => val.lit(),
- PythonLiteral::Int(val) => val.lit(),
- PythonLiteral::UInt(val) => val.lit(),
- PythonLiteral::Float(val) => val.lit(),
- PythonLiteral::Str(val) => val.lit(),
- PythonLiteral::Binary(val) => val.lit(),
- }
- }
-}
+use datafusion::arrow::datatypes::DataType;
+use datafusion::logical_plan;
+//use datafusion::logical_plan::Expr;
+use datafusion::physical_plan::functions::Volatility;
+use datafusion::physical_plan::{
+ aggregates::AggregateFunction, functions::BuiltinScalarFunction,
+};
-/// Expression representing a constant value
-#[pyfunction]
-#[pyo3(text_signature = "(value)")]
-fn lit(value: &PyAny) -> PyResult<expression::Expression> {
- let py_lit = value.extract::<PythonLiteral>()?;
- let expr = py_lit.lit();
- Ok(expression::Expression { expr })
-}
+use crate::{
+ expression::{PyAggregateUDF, PyExpr, PyScalarUDF},
+ udaf, udf,
+};
#[pyfunction]
-fn array(value: Vec<expression::Expression>) -> expression::Expression {
- expression::Expression {
+fn array(value: Vec<PyExpr>) -> PyExpr {
+ PyExpr {
expr: logical_plan::array(value.into_iter().map(|x|
x.expr).collect::<Vec<_>>()),
}
}
#[pyfunction]
-fn in_list(
- expr: expression::Expression,
- value: Vec<expression::Expression>,
- negated: bool,
-) -> expression::Expression {
- expression::Expression {
- expr: logical_plan::in_list(
- expr.expr,
- value.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
- negated,
- ),
- }
+fn in_list(expr: PyExpr, value: Vec<PyExpr>, negated: bool) -> PyExpr {
+ logical_plan::in_list(
+ expr.expr,
+ value.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
+ negated,
+ )
+ .into()
}
/// Current date and time
#[pyfunction]
-fn now() -> expression::Expression {
- expression::Expression {
+fn now() -> PyExpr {
+ PyExpr {
// here lit(0) is a stub for conform to arity
expr: logical_plan::now(logical_plan::lit(0)),
}
}
/// Returns a random value in the range 0.0 <= x < 1.0
#[pyfunction]
-fn random() -> expression::Expression {
- expression::Expression {
+fn random() -> PyExpr {
+ PyExpr {
expr: logical_plan::random(),
}
}
/// Computes a binary hash of the given data. type is the algorithm to use.
/// Standard algorithms are md5, sha224, sha256, sha384, sha512, blake2s,
blake2b, and blake3.
#[pyfunction(value, method)]
-fn digest(
- value: expression::Expression,
- method: expression::Expression,
-) -> expression::Expression {
- expression::Expression {
+fn digest(value: PyExpr, method: PyExpr) -> PyExpr {
+ PyExpr {
expr: logical_plan::digest(value.expr, method.expr),
}
}
/// Concatenates the text representations of all the arguments.
/// NULL arguments are ignored.
#[pyfunction(args = "*")]
-fn concat(args: &PyTuple) -> PyResult<expression::Expression> {
- let expressions = expression::from_tuple(args)?;
- let args = expressions.into_iter().map(|e| e.expr).collect::<Vec<_>>();
- Ok(expression::Expression {
- expr: logical_plan::concat(&args),
- })
+fn concat(args: Vec<PyExpr>) -> PyResult<PyExpr> {
+ let args = args.into_iter().map(|e| e.expr).collect::<Vec<_>>();
+ Ok(logical_plan::concat(&args).into())
}
/// Concatenates all but the first argument, with separators.
/// The first argument is used as the separator string, and should not be NULL.
/// Other NULL arguments are ignored.
#[pyfunction(sep, args = "*")]
-fn concat_ws(sep: String, args: &PyTuple) -> PyResult<expression::Expression> {
- let expressions = expression::from_tuple(args)?;
- let args = expressions.into_iter().map(|e| e.expr).collect::<Vec<_>>();
- Ok(expression::Expression {
- expr: logical_plan::concat_ws(sep, &args),
- })
+fn concat_ws(sep: String, args: Vec<PyExpr>) -> PyResult<PyExpr> {
+ let args = args.into_iter().map(|e| e.expr).collect::<Vec<_>>();
+ Ok(logical_plan::concat_ws(sep, &args).into())
}
-macro_rules! define_unary_function {
- ($NAME: ident) => {
- #[doc = "This function is not documented yet"]
- #[pyfunction]
- fn $NAME(value: expression::Expression) -> expression::Expression {
- expression::Expression {
- expr: logical_plan::$NAME(value.expr),
- }
- }
+macro_rules! scalar_function {
+ ($NAME: ident, $FUNC: ident) => {
+ scalar_function!($NAME, $FUNC, stringify!($NAME));
};
- ($NAME: ident, $DOC: expr) => {
+ ($NAME: ident, $FUNC: ident, $DOC: expr) => {
#[doc = $DOC]
- #[pyfunction]
- fn $NAME(value: expression::Expression) -> expression::Expression {
- expression::Expression {
- expr: logical_plan::$NAME(value.expr),
- }
+ #[pyfunction(args = "*")]
+ fn $NAME(args: Vec<PyExpr>) -> PyExpr {
+ let expr = logical_plan::Expr::ScalarFunction {
+ fun: BuiltinScalarFunction::$FUNC,
+ args: args.into_iter().map(|e| e.into()).collect(),
+ };
+ expr.into()
}
};
}
-define_unary_function!(sqrt, "sqrt");
-define_unary_function!(sin, "sin");
-define_unary_function!(cos, "cos");
-define_unary_function!(tan, "tan");
-define_unary_function!(asin, "asin");
-define_unary_function!(acos, "acos");
-define_unary_function!(atan, "atan");
-define_unary_function!(floor, "floor");
-define_unary_function!(ceil, "ceil");
-define_unary_function!(round, "round");
-define_unary_function!(trunc, "trunc");
-define_unary_function!(abs, "abs");
-define_unary_function!(signum, "signum");
-define_unary_function!(exp, "exp");
-define_unary_function!(ln, "ln");
-define_unary_function!(log2, "log2");
-define_unary_function!(log10, "log10");
+macro_rules! aggregate_function {
+ ($NAME: ident, $FUNC: ident) => {
+ aggregate_function!($NAME, $FUNC, stringify!($NAME));
+ };
+ ($NAME: ident, $FUNC: ident, $DOC: expr) => {
+ #[doc = $DOC]
+ #[pyfunction(args = "*", distinct = "false")]
+ fn $NAME(args: Vec<PyExpr>, distinct: bool) -> PyExpr {
+ let expr = logical_plan::Expr::AggregateFunction {
+ fun: AggregateFunction::$FUNC,
+ args: args.into_iter().map(|e| e.into()).collect(),
+ distinct,
+ };
+ expr.into()
+ }
+ };
+}
-define_unary_function!(ascii, "Returns the numeric code of the first character
of the argument. In UTF8 encoding, returns the Unicode code point of the
character. In other multibyte encodings, the argument must be an ASCII
character.");
-define_unary_function!(sum);
-define_unary_function!(
+scalar_function!(abs, Abs);
+scalar_function!(acos, Acos);
+scalar_function!(ascii, Ascii, "Returns the numeric code of the first
character of the argument. In UTF8 encoding, returns the Unicode code point of
the character. In other multibyte encodings, the argument must be an ASCII
character.");
+scalar_function!(asin, Asin);
+scalar_function!(atan, Atan);
+scalar_function!(
bit_length,
+ BitLength,
"Returns number of bits in the string (8 times the octet_length)."
);
-define_unary_function!(btrim, "Removes the longest string containing only
characters in characters (a space by default) from the start and end of
string.");
-define_unary_function!(
+scalar_function!(btrim, Btrim, "Removes the longest string containing only
characters in characters (a space by default) from the start and end of
string.");
+scalar_function!(ceil, Ceil);
+scalar_function!(
character_length,
+ CharacterLength,
"Returns number of characters in the string."
);
-define_unary_function!(chr, "Returns the character with the given code.");
-define_unary_function!(initcap, "Converts the first letter of each word to
upper case and the rest to lower case. Words are sequences of alphanumeric
characters separated by non-alphanumeric characters.");
-define_unary_function!(left, "Returns first n characters in the string, or
when n is negative, returns all but last |n| characters.");
-define_unary_function!(lower, "Converts the string to all lower case");
-define_unary_function!(lpad, "Extends the string to length length by
prepending the characters fill (a space by default). If the string is already
longer than length then it is truncated (on the right).");
-define_unary_function!(ltrim, "Removes the longest string containing only
characters in characters (a space by default) from the start of string.");
-define_unary_function!(
+scalar_function!(chr, Chr, "Returns the character with the given code.");
+scalar_function!(cos, Cos);
+scalar_function!(exp, Exp);
+scalar_function!(floor, Floor);
+scalar_function!(initcap, InitCap, "Converts the first letter of each word to
upper case and the rest to lower case. Words are sequences of alphanumeric
characters separated by non-alphanumeric characters.");
+scalar_function!(left, Left, "Returns first n characters in the string, or
when n is negative, returns all but last |n| characters.");
+scalar_function!(ln, Ln);
+scalar_function!(log10, Log10);
+scalar_function!(log2, Log2);
+scalar_function!(lower, Lower, "Converts the string to all lower case");
+scalar_function!(lpad, Lpad, "Extends the string to length length by
prepending the characters fill (a space by default). If the string is already
longer than length then it is truncated (on the right).");
+scalar_function!(ltrim, Ltrim, "Removes the longest string containing only
characters in characters (a space by default) from the start of string.");
+scalar_function!(
md5,
+ MD5,
"Computes the MD5 hash of the argument, with the result written in
hexadecimal."
);
-define_unary_function!(octet_length, "Returns number of bytes in the string.
Since this version of the function accepts type character directly, it will not
strip trailing spaces.");
-define_unary_function!(
- replace,
- "Replaces all occurrences in string of substring from with substring to."
-);
-define_unary_function!(repeat, "Repeats string the specified number of
times.");
-define_unary_function!(
+scalar_function!(octet_length, OctetLength, "Returns number of bytes in the
string. Since this version of the function accepts type character directly, it
will not strip trailing spaces.");
+scalar_function!(regexp_match, RegexpMatch);
+scalar_function!(
regexp_replace,
+ RegexpReplace,
"Replaces substring(s) matching a POSIX regular expression"
);
-define_unary_function!(
+scalar_function!(
+ repeat,
+ Repeat,
+ "Repeats string the specified number of times."
+);
+scalar_function!(
+ replace,
+ Replace,
+ "Replaces all occurrences in string of substring from with substring to."
+);
+scalar_function!(
reverse,
+ Reverse,
"Reverses the order of the characters in the string."
);
-define_unary_function!(right, "Returns last n characters in the string, or
when n is negative, returns all but first |n| characters.");
-define_unary_function!(rpad, "Extends the string to length length by appending
the characters fill (a space by default). If the string is already longer than
length then it is truncated.");
-define_unary_function!(rtrim, "Removes the longest string containing only
characters in characters (a space by default) from the end of string.");
-define_unary_function!(sha224);
-define_unary_function!(sha256);
-define_unary_function!(sha384);
-define_unary_function!(sha512);
-define_unary_function!(split_part, "Splits string at occurrences of delimiter
and returns the n'th field (counting from one).");
-define_unary_function!(starts_with, "Returns true if string starts with
prefix.");
-define_unary_function!(strpos,"Returns starting index of specified substring
within string, or zero if it's not present. (Same as position(substring in
string), but note the reversed argument order.)");
-define_unary_function!(substr);
-define_unary_function!(
+scalar_function!(right, Right, "Returns last n characters in the string, or
when n is negative, returns all but first |n| characters.");
+scalar_function!(round, Round);
+scalar_function!(rpad, Rpad, "Extends the string to length length by appending
the characters fill (a space by default). If the string is already longer than
length then it is truncated.");
+scalar_function!(rtrim, Rtrim, "Removes the longest string containing only
characters in characters (a space by default) from the end of string.");
+scalar_function!(sha224, SHA224);
+scalar_function!(sha256, SHA256);
+scalar_function!(sha384, SHA384);
+scalar_function!(sha512, SHA512);
+scalar_function!(signum, Signum);
+scalar_function!(sin, Sin);
+scalar_function!(split_part, SplitPart, "Splits string at occurrences of
delimiter and returns the n'th field (counting from one).");
+scalar_function!(sqrt, Sqrt);
+scalar_function!(
+ starts_with,
+ StartsWith,
+ "Returns true if string starts with prefix."
+);
+scalar_function!(strpos, Strpos, "Returns starting index of specified
substring within string, or zero if it's not present. (Same as
position(substring in string), but note the reversed argument order.)");
+scalar_function!(substr, Substr);
+scalar_function!(tan, Tan);
+scalar_function!(
to_hex,
+ ToHex,
"Converts the number to its equivalent hexadecimal representation."
);
-define_unary_function!(translate, "Replaces each character in string that
matches a character in the from set with the corresponding character in the to
set. If from is longer than to, occurrences of the extra characters in from are
deleted.");
-define_unary_function!(trim, "Removes the longest string containing only
characters in characters (a space by default) from the start, end, or both ends
(BOTH is the default) of string.");
-define_unary_function!(upper, "Converts the string to all upper case.");
-define_unary_function!(avg);
-define_unary_function!(min);
-define_unary_function!(max);
-define_unary_function!(count);
-define_unary_function!(approx_distinct);
+scalar_function!(to_timestamp, ToTimestamp);
+scalar_function!(translate, Translate, "Replaces each character in string that
matches a character in the from set with the corresponding character in the to
set. If from is longer than to, occurrences of the extra characters in from are
deleted.");
+scalar_function!(trim, Trim, "Removes the longest string containing only
characters in characters (a space by default) from the start, end, or both ends
(BOTH is the default) of string.");
+scalar_function!(trunc, Trunc);
+scalar_function!(upper, Upper, "Converts the string to all upper case.");
+
+aggregate_function!(avg, Avg);
+aggregate_function!(count, Count);
+aggregate_function!(max, Max);
+aggregate_function!(min, Min);
+aggregate_function!(sum, Sum);
Review comment:
I think the recently added `approx_distinct` function got missed from
the merge.
##########
File path: python/src/dataframe.rs
##########
@@ -15,174 +15,94 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
-use logical_plan::LogicalPlan;
-use pyo3::{prelude::*, types::PyTuple};
+use pyo3::prelude::*;
use tokio::runtime::Runtime;
-use datafusion::execution::context::ExecutionContext as _ExecutionContext;
-use datafusion::logical_plan::{JoinType, LogicalPlanBuilder};
-use datafusion::physical_plan::collect;
-use datafusion::{execution::context::ExecutionContextState, logical_plan};
-
-use crate::{errors, to_py};
-use crate::{errors::DataFusionError, expression};
+use datafusion::arrow::datatypes::Schema;
+use datafusion::arrow::pyarrow::PyArrowConvert;
use datafusion::arrow::util::pretty;
+use datafusion::dataframe::DataFrame;
+use datafusion::logical_plan::JoinType;
+
+use crate::{errors::DataFusionError, expression::PyExpr};
-/// A DataFrame is a representation of a logical plan and an API to compose
statements.
+/// A PyDataFrame is a representation of a logical plan and an API to compose
statements.
/// Use it to build a plan and `.collect()` to execute the plan and collect
the result.
/// The actual execution of a plan runs natively on Rust and Arrow on a
multi-threaded environment.
-#[pyclass]
-pub(crate) struct DataFrame {
- ctx_state: Arc<Mutex<ExecutionContextState>>,
- plan: LogicalPlan,
+#[pyclass(name = "DataFrame")]
+#[derive(Clone)]
+pub(crate) struct PyDataFrame {
+ df: Arc<dyn DataFrame>,
}
-impl DataFrame {
- /// creates a new DataFrame
- pub fn new(ctx_state: Arc<Mutex<ExecutionContextState>>, plan:
LogicalPlan) -> Self {
- Self { ctx_state, plan }
+impl PyDataFrame {
+ /// creates a new PyDataFrame
+ pub fn new(df: Arc<dyn DataFrame>) -> Self {
+ Self { df }
}
}
#[pymethods]
-impl DataFrame {
- /// Select `expressions` from the existing DataFrame.
- #[args(args = "*")]
- fn select(&self, args: &PyTuple) -> PyResult<Self> {
- let expressions = expression::from_tuple(args)?;
- let builder = LogicalPlanBuilder::from(self.plan.clone());
- let builder =
- errors::wrap(builder.project(expressions.into_iter().map(|e|
e.expr)))?;
- let plan = errors::wrap(builder.build())?;
-
- Ok(DataFrame {
- ctx_state: self.ctx_state.clone(),
- plan,
- })
+impl PyDataFrame {
+ /// Returns the schema from the logical plan
+ fn schema(&self) -> Schema {
+ self.df.schema().into()
}
- /// Filter according to the `predicate` expression
- fn filter(&self, predicate: expression::Expression) -> PyResult<Self> {
- let builder = LogicalPlanBuilder::from(self.plan.clone());
- let builder = errors::wrap(builder.filter(predicate.expr))?;
- let plan = errors::wrap(builder.build())?;
+ #[args(args = "*")]
+ fn select(&self, args: Vec<PyExpr>) -> PyResult<Self> {
+ let expr = args.into_iter().map(|e| e.into()).collect();
+ let df = self.df.select(expr)?;
+ Ok(Self::new(df))
+ }
- Ok(DataFrame {
- ctx_state: self.ctx_state.clone(),
- plan,
- })
+ fn filter(&self, predicate: PyExpr) -> PyResult<Self> {
+ let df = self.df.filter(predicate.into())?;
+ Ok(Self::new(df))
}
- /// Aggregates using expressions
- fn aggregate(
- &self,
- group_by: Vec<expression::Expression>,
- aggs: Vec<expression::Expression>,
- ) -> PyResult<Self> {
- let builder = LogicalPlanBuilder::from(self.plan.clone());
- let builder = errors::wrap(builder.aggregate(
- group_by.into_iter().map(|e| e.expr),
- aggs.into_iter().map(|e| e.expr),
- ))?;
- let plan = errors::wrap(builder.build())?;
-
- Ok(DataFrame {
- ctx_state: self.ctx_state.clone(),
- plan,
- })
+ fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) ->
PyResult<Self> {
+ let group_by = group_by.into_iter().map(|e| e.into()).collect();
+ let aggs = aggs.into_iter().map(|e| e.into()).collect();
+ let df = self.df.aggregate(group_by, aggs)?;
+ Ok(Self::new(df))
}
- /// Sort by specified sorting expressions
- fn sort(&self, exprs: Vec<expression::Expression>) -> PyResult<Self> {
- let exprs = exprs.into_iter().map(|e| e.expr);
- let builder = LogicalPlanBuilder::from(self.plan.clone());
- let builder = errors::wrap(builder.sort(exprs))?;
- let plan = errors::wrap(builder.build())?;
- Ok(DataFrame {
- ctx_state: self.ctx_state.clone(),
- plan,
- })
+ #[args(exprs = "*")]
+ fn sort(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
+ let exprs = exprs.into_iter().map(|e| e.into()).collect();
+ let df = self.df.sort(exprs)?;
+ Ok(Self::new(df))
}
- /// Limits the plan to return at most `count` rows
fn limit(&self, count: usize) -> PyResult<Self> {
- let builder = LogicalPlanBuilder::from(self.plan.clone());
- let builder = errors::wrap(builder.limit(count))?;
- let plan = errors::wrap(builder.build())?;
-
- Ok(DataFrame {
- ctx_state: self.ctx_state.clone(),
- plan,
- })
+ let df = self.df.limit(count)?;
+ Ok(Self::new(df))
}
/// Executes the plan, returning a list of `RecordBatch`es.
- /// Unless some order is specified in the plan, there is no guarantee of
the order of the result
- fn collect(&self, py: Python) -> PyResult<PyObject> {
- let ctx = _ExecutionContext::from(self.ctx_state.clone());
+ /// Unless some order is specified in the plan, there is no
+ /// guarantee of the order of the result.
+ fn collect(&self, py: Python) -> PyResult<Vec<PyObject>> {
let rt = Runtime::new().unwrap();
- let plan = ctx
- .optimize(&self.plan)
- .map_err(|e| -> errors::DataFusionError { e.into() })?;
-
- let plan = py.allow_threads(|| {
- rt.block_on(async {
- ctx.create_physical_plan(&plan)
- .await
- .map_err(|e| -> errors::DataFusionError { e.into() })
- })
- })?;
-
- let batches = py.allow_threads(|| {
- rt.block_on(async {
- collect(plan)
- .await
- .map_err(|e| -> errors::DataFusionError { e.into() })
- })
- })?;
- to_py::to_py(&batches)
+ let batches = py.allow_threads(|| rt.block_on(self.df.collect()))?;
+ // cannot use PyResult<Vec<RecordBatch>> return type due to
+ // https://github.com/PyO3/pyo3/issues/1813
+ batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
}
/// Print the result, 20 lines by default
#[args(num = "20")]
fn show(&self, py: Python, num: usize) -> PyResult<()> {
- let ctx = _ExecutionContext::from(self.ctx_state.clone());
let rt = Runtime::new().unwrap();
- let plan = py.allow_threads(|| {
- rt.block_on(async {
- let l_plan = ctx
- .optimize(&self.limit(num)?.plan)
- .map_err(|e| -> errors::DataFusionError { e.into() })?;
- let p_plan = ctx
- .create_physical_plan(&l_plan)
- .await
- .map_err(|e| -> errors::DataFusionError { e.into() })?;
- Ok::<_, PyErr>(p_plan)
- })
- })?;
-
- let batches = py.allow_threads(|| {
- rt.block_on(async {
- collect(plan)
- .await
- .map_err(|e| -> errors::DataFusionError { e.into() })
- })
- })?;
-
- Ok(pretty::print_batches(&batches).unwrap())
+ let df = self.df.limit(num)?;
+ let batches = py.allow_threads(|| rt.block_on(df.collect()))?;
+ Ok(pretty::print_batches(&batches)?)
}
- /// Returns the join of two DataFrames `on`.
- fn join(
- &self,
- right: &DataFrame,
- join_keys: (Vec<&str>, Vec<&str>),
- how: &str,
- ) -> PyResult<Self> {
- let builder = LogicalPlanBuilder::from(self.plan.clone());
-
+ fn join(&self, right: PyDataFrame, on: Vec<&str>, how: &str) ->
PyResult<Self> {
Review comment:
Any reason why we changed from `join_keys: (Vec<&str>, Vec<&str>)` to
`on: Vec<&str>`? We changed to `(Vec<&str>, Vec<&str>)` previously so that we
could join on different keys from two dataframes.
We could change it to `left_cols, right_cols` to be consistent with
`Dataframe::join` if that makes it easier.
##########
File path: python/src/lib.rs
##########
@@ -17,42 +17,45 @@
use pyo3::prelude::*;
+mod catalog;
mod context;
mod dataframe;
mod errors;
mod expression;
mod functions;
-mod scalar;
-mod to_py;
-mod to_rust;
-mod types;
mod udaf;
mod udf;
+// TODO(kszucs): remvoe
// taken from https://github.com/PyO3/pyo3/issues/471
-fn register_module_package(py: Python, package_name: &str, module: &PyModule) {
- py.import("sys")
- .expect("failed to import python sys module")
- .dict()
- .get_item("modules")
- .expect("failed to get python modules dictionary")
- .downcast::<pyo3::types::PyDict>()
- .expect("failed to turn sys.modules into a PyDict")
- .set_item(package_name, module)
- .expect("failed to inject module");
-}
+// fn register_module_package(py: Python, package_name: &str, module:
&PyModule) {
+// py.import("sys")
+// .expect("failed to import python sys module")
+// .dict()
+// .get_item("modules")
+// .expect("failed to get python modules dictionary")
+// .downcast::<pyo3::types::PyDict>()
+// .expect("failed to turn sys.modules into a PyDict")
+// .set_item(package_name, module)
+// .expect("failed to inject module");
+// }
/// DataFusion.
#[pymodule]
-fn datafusion(py: Python, m: &PyModule) -> PyResult<()> {
- m.add_class::<context::ExecutionContext>()?;
- m.add_class::<dataframe::DataFrame>()?;
- m.add_class::<expression::Expression>()?;
+fn internals(py: Python, m: &PyModule) -> PyResult<()> {
+ expression::init(m)?;
+
+ //register_module_package(py, "datafusion.functions", functions);
Review comment:
this was needed to make `from datafusion.functions import trim` to work
for the documentation build, any particular reason why we want to remove it
here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]