alamb commented on code in PR #9613:
URL: https://github.com/apache/arrow-datafusion/pull/9613#discussion_r1530604266
##########
datafusion-examples/Cargo.toml:
##########
@@ -75,6 +75,6 @@ serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
# 0.10 and 0.11 are incompatible. Need to upgrade tonic to 0.11 when upgrading
to arrow 51
Review Comment:
Removed in 456e2fe1892c9f5e91bde791450b07be12b96d9b
##########
datafusion/functions/src/datetime/date_part.rs:
##########
@@ -187,83 +144,46 @@ impl ScalarUDFImpl for DatePartFunc {
}
}
-fn to_ticks<T>(array: &PrimitiveArray<T>, frac: i32) -> Result<Float64Array>
-where
- T: ArrowTemporalType + ArrowNumericType,
- i64: From<T::Native>,
-{
- let zipped = temporal::second(array)?
- .values()
- .iter()
- .zip(temporal::nanosecond(array)?.values().iter())
- .map(|o| (*o.0 as f64 + (*o.1 as f64) / 1_000_000_000.0) * (frac as
f64))
- .collect::<Vec<f64>>();
-
- Ok(Float64Array::from(zipped))
+/// Invoke [`date_part`] and cast the result to Float64
+fn date_part_f64(array: &dyn Array, part: DatePart) -> Result<ArrayRef> {
+ Ok(cast(date_part(array, part)?.as_ref(), &Float64)?)
}
-fn seconds<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
-where
- T: ArrowTemporalType + ArrowNumericType,
- i64: From<T::Native>,
-{
- to_ticks(array, 1)
-}
-
-fn millis<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
-where
- T: ArrowTemporalType + ArrowNumericType,
- i64: From<T::Native>,
-{
- to_ticks(array, 1_000)
-}
-
-fn micros<T>(array: &PrimitiveArray<T>) -> Result<Float64Array>
-where
- T: ArrowTemporalType + ArrowNumericType,
- i64: From<T::Native>,
-{
- to_ticks(array, 1_000_000)
+fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
+ let sf = match unit {
+ Second => 1_f64,
+ Millisecond => 1_000_f64,
+ Microsecond => 1_000_000_f64,
+ Nanosecond => 1_000_000_000_f64,
+ };
+ let secs = date_part(array, DatePart::Second)?;
+ let secs = as_int32_array(secs.as_ref())?;
Review Comment:
Added in f550b64e2
##########
datafusion-examples/examples/deserialize_to_struct.rs:
##########
@@ -15,61 +15,61 @@
// specific language governing permissions and limitations
// under the License.
+use arrow::array::AsArray;
+use arrow::datatypes::{Float64Type, Int32Type};
use datafusion::error::Result;
use datafusion::prelude::*;
-use serde::Deserialize;
+use futures::StreamExt;
/// This example shows that it is possible to convert query results into Rust
structs .
-/// It will collect the query results into RecordBatch, then convert it to
serde_json::Value.
-/// Then, serde_json::Value is turned into Rust's struct.
-/// Any datatype with `Deserialize` implemeneted works.
#[tokio::main]
async fn main() -> Result<()> {
let data_list = Data::new().await?;
println!("{data_list:#?}");
Ok(())
}
-#[derive(Deserialize, Debug)]
+#[derive(Debug)]
struct Data {
#[allow(dead_code)]
- int_col: i64,
+ int_col: i32,
#[allow(dead_code)]
double_col: f64,
}
impl Data {
pub async fn new() -> Result<Vec<Self>> {
// this group is almost the same as the one you find it in
parquet_sql.rs
- let batches = {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::new();
- let testdata = datafusion::test_util::parquet_test_data();
+ let testdata = datafusion::test_util::parquet_test_data();
- ctx.register_parquet(
- "alltypes_plain",
- &format!("{testdata}/alltypes_plain.parquet"),
- ParquetReadOptions::default(),
- )
- .await?;
+ ctx.register_parquet(
+ "alltypes_plain",
+ &format!("{testdata}/alltypes_plain.parquet"),
+ ParquetReadOptions::default(),
+ )
+ .await?;
- let df = ctx
- .sql("SELECT int_col, double_col FROM alltypes_plain")
- .await?;
+ let df = ctx
+ .sql("SELECT int_col, double_col FROM alltypes_plain")
+ .await?;
- df.clone().show().await?;
+ df.clone().show().await?;
- df.collect().await?
- };
- let batches: Vec<_> = batches.iter().collect();
+ let mut stream = df.execute_stream().await?;
+ let mut list = vec![];
+ while let Some(b) = stream.next().await.transpose()? {
+ let int_col = b.column(0).as_primitive::<Int32Type>();
+ let float_col = b.column(1).as_primitive::<Float64Type>();
- // converts it to serde_json type and then convert that into Rust type
Review Comment:
I think since this is an example, we can always update / improve it as a
follow on PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]