jiangzhx commented on issue #5635:
URL: 
https://github.com/apache/arrow-datafusion/issues/5635#issuecomment-1475086294

   cc @BubbaJoe 
   i wrote a to_date UDF myself, which seems to be correct.
   
   ```
   
   use arrow::array::{Date32Array, Date32Builder, StringArray};
   use chrono::DateTime;
   use datafusion::from_slice::FromSlice;
   use datafusion::prelude::*;
   use datafusion::{
       arrow::{datatypes::DataType, record_batch::RecordBatch},
       logical_expr::Volatility,
   };
   use datafusion_common::{downcast_value, ScalarValue};
   use datafusion_common::{DataFusionError, Result};
   use datafusion_expr::ColumnarValue;
   use std::sync::Arc;
   
   // create local execution context with an in-memory table
   fn create_context() -> Result<SessionContext> {
       use datafusion::arrow::datatypes::{Field, Schema};
       // define a schema.
       let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, 
false)]));
   
       // define data.
       let batch = RecordBatch::try_new(
           schema,
           vec![Arc::new(StringArray::from_slice([
               "2023-03-04T05:01:14.274000+00:00",
               "2023-03-05T05:02:15.274000+00:00",
               "2023-03-06T05:03:16.274000+00:00",
           ]))],
       )?;
   
       // declare a new context. In spark API, this corresponds to a new spark 
SQLsession
       let ctx = SessionContext::new();
   
       // declare a table in memory. In spark API, this corresponds to 
createDataFrame(...).
       ctx.register_batch("t", batch)?;
       Ok(ctx)
   }
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let ctx = create_context()?;
   
       pub fn to_date(args: &[ColumnarValue]) -> Result<ColumnarValue> {
           if args.is_empty() || args.len() > 1 {
               return Err(DataFusionError::Internal(format!(
                   "to_date was called with {} arguments. It requires only 1.",
                   args.len()
               )));
           }
   
           match &args[0] {
               ColumnarValue::Array(array) => {
                   let args = downcast_value!(array, StringArray);
                   let mut builder = Date32Builder::new();
   
                   for arg in args {
                       let date_time = match 
DateTime::parse_from_rfc3339(arg.unwrap()) {
                           Ok(dt) => dt,
                           Err(e) => {
                               return 
Result::Err(DataFusionError::Internal(e.to_string()));
                           }
                       };
                       builder.append_value((date_time.timestamp() / 86400) as 
i32);
                   }
                   let date32 = Arc::new(Date32Array::from(builder.finish()));
   
                   Ok(ColumnarValue::Array(date32))
               }
               ColumnarValue::Scalar(ScalarValue::Utf8(v)) => {
                   let date_time =
                       match 
DateTime::parse_from_rfc3339((v.clone().unwrap()).as_str()) {
                           Ok(dt) => dt,
                           Err(e) => {
                               return 
Result::Err(DataFusionError::Internal(e.to_string()));
                           }
                       };
                   let mut builder = Date32Builder::new();
   
                   builder.append_value((date_time.timestamp() / 86400) as i32);
                   let date32 = Arc::new(Date32Array::from(builder.finish()));
   
                   Ok(ColumnarValue::Array(date32))
               }
               _ => {
                   return Err(DataFusionError::Execution(
                       "array of `to_date` must be non-null scalar 
Utf8".to_string(),
                   ));
               }
           }
       }
   
       // let to_date = make_scalar_function(to_date);
       let to_date = create_udf(
           "to_date",
           vec![DataType::Utf8],
           Arc::new(DataType::Date32),
           Volatility::Immutable,
           Arc::new(to_date),
       );
   
       ctx.register_udf(to_date.clone()); 
   
       let expr = to_date.call(vec![col("a")]);
       let df = ctx.table("t").await?;
       let df = df.select(vec![expr])?;
       df.show().await?;
   
       ctx.sql("SELECT to_date(a) as u from t group by u")
           .await?
           .show()
           .await?;
   
       ctx.sql("SELECT COUNT(*), T, SUM(COUNT(*)) Over (Order By T) From 
(SELECT *, to_date(a) as T from t) Group BY T")
           .await?
           .show()
           .await?;
       Ok(())
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to