jiangzhx commented on issue #5635:
URL:
https://github.com/apache/arrow-datafusion/issues/5635#issuecomment-1475086294
cc @BubbaJoe
i wrote a to_date UDF myself, which seems to be correct.
```
use arrow::array::{Date32Array, Date32Builder, StringArray};
use chrono::DateTime;
use datafusion::from_slice::FromSlice;
use datafusion::prelude::*;
use datafusion::{
arrow::{datatypes::DataType, record_batch::RecordBatch},
logical_expr::Volatility,
};
use datafusion_common::{downcast_value, ScalarValue};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;
// create local execution context with an in-memory table
fn create_context() -> Result<SessionContext> {
use datafusion::arrow::datatypes::{Field, Schema};
// define a schema.
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8,
false)]));
// define data.
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from_slice([
"2023-03-04T05:01:14.274000+00:00",
"2023-03-05T05:02:15.274000+00:00",
"2023-03-06T05:03:16.274000+00:00",
]))],
)?;
// declare a new context. In spark API, this corresponds to a new spark
SQLsession
let ctx = SessionContext::new();
// declare a table in memory. In spark API, this corresponds to
createDataFrame(...).
ctx.register_batch("t", batch)?;
Ok(ctx)
}
#[tokio::main]
async fn main() -> Result<()> {
let ctx = create_context()?;
pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.is_empty() || args.len() > 1 {
return Err(DataFusionError::Internal(format!(
"to_date was called with {} arguments. It requires only 1.",
args.len()
)));
}
match &args[0] {
ColumnarValue::Array(array) => {
let args = downcast_value!(array, StringArray);
let mut builder = Date32Builder::new();
for arg in args {
let date_time = match
DateTime::parse_from_rfc3339(arg.unwrap()) {
Ok(dt) => dt,
Err(e) => {
return
Result::Err(DataFusionError::Internal(e.to_string()));
}
};
builder.append_value((date_time.timestamp() / 86400) as
i32);
}
let date32 = Arc::new(Date32Array::from(builder.finish()));
Ok(ColumnarValue::Array(date32))
}
ColumnarValue::Scalar(ScalarValue::Utf8(v)) => {
let date_time =
match
DateTime::parse_from_rfc3339((v.clone().unwrap()).as_str()) {
Ok(dt) => dt,
Err(e) => {
return
Result::Err(DataFusionError::Internal(e.to_string()));
}
};
let mut builder = Date32Builder::new();
builder.append_value((date_time.timestamp() / 86400) as i32);
let date32 = Arc::new(Date32Array::from(builder.finish()));
Ok(ColumnarValue::Array(date32))
}
_ => {
return Err(DataFusionError::Execution(
"array of `to_date` must be non-null scalar
Utf8".to_string(),
));
}
}
}
// let to_date = make_scalar_function(to_date);
let to_date = create_udf(
"to_date",
vec![DataType::Utf8],
Arc::new(DataType::Date32),
Volatility::Immutable,
Arc::new(to_date),
);
ctx.register_udf(to_date.clone());
let expr = to_date.call(vec![col("a")]);
let df = ctx.table("t").await?;
let df = df.select(vec![expr])?;
df.show().await?;
ctx.sql("SELECT to_date(a) as u from t group by u")
.await?
.show()
.await?;
ctx.sql("SELECT COUNT(*), T, SUM(COUNT(*)) Over (Order By T) From
(SELECT *, to_date(a) as T from t) Group BY T")
.await?
.show()
.await?;
Ok(())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]