tustvold commented on code in PR #7401:
URL: https://github.com/apache/arrow-datafusion/pull/7401#discussion_r1310019196


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
-    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, 
DictionaryArray};
+
+    use crate::physical_plan::streaming::PartitionStream;
+    use crate::physical_plan::stream::RecordBatchStreamAdapter;
+    use crate::datasource::{streaming::StreamingTable, TableProvider};
 
     use super::*;
 
+    fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, 
RecordBatch> {
+        let col_b_init_clone = col_b_init.clone();
+        futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut 
col_b_ascii)| async move {
+            // stop the stream at 20 batch now. 
+            // Need to figure out how all the columns in the batches are 
sorted.
+            if counter >= 12000 {
+                return None;
+            }
+
+            if counter % 5 == 0 {
+                col_b_ascii = col_b_ascii + 2;
+            }
+
+            counter = counter + 1;
+            
+            // building col `a`
+            let mut values_vector: Vec<String> = Vec::new();
+            for _i in 1..=8192 {
+                // values_vector.push(rand::thread_rng().gen_range(1..=1000));

Review Comment:
   Coments?



##########
datafusion/core/src/physical_plan/wrapper.rs:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use arrow::row::{SortField, Rows};
+use arrow::datatypes::DataType;
+use arrow::error::ArrowError;
+use arrow_array::*;
+use arrow_array::types::*;
+use arrow::row::RowConverter;
+
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+macro_rules! downcast_dict {
+    ($array:ident, $key:ident) => {{
+        $array
+            .as_any()
+            .downcast_ref::<DictionaryArray<$key>>()
+            .unwrap()
+    }};
+}
+
+#[derive(Debug)]
+pub struct CardinalityAwareRowConverter {
+    fields: Vec<SortField>,
+    inner: Option<RowConverter>,
+    done: bool,
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
+        Ok(Self {
+            fields: fields.clone(),
+            inner: None,
+            done: false,
+        })
+    }
+    
+    pub fn size(&self) -> usize {
+        return self.inner.as_ref().unwrap().size();
+    }
+
+    pub fn convert_rows(&self, rows: &Rows) -> Result<Vec<ArrayRef>, 
ArrowError> {
+        self.inner.as_ref().unwrap().convert_rows(rows)
+    }
+
+    pub fn convert_columns(
+        &mut self,
+        columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
+        
+        if !self.done {
+            for (i, col) in columns.iter().enumerate() {
+                if let DataType::Dictionary(k, _) = col.data_type() {

Review Comment:
   This could make use of 
https://docs.rs/arrow-array/latest/arrow_array/cast/trait.AsArray.html#tymethod.as_any_dictionary_opt
 to avoid the downcasting



##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
-    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, 
DictionaryArray};
+
+    use crate::physical_plan::streaming::PartitionStream;
+    use crate::physical_plan::stream::RecordBatchStreamAdapter;
+    use crate::datasource::{streaming::StreamingTable, TableProvider};
 
     use super::*;
 
+    fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, 
RecordBatch> {
+        let col_b_init_clone = col_b_init.clone();
+        futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut 
col_b_ascii)| async move {
+            // stop the stream at 20 batch now. 
+            // Need to figure out how all the columns in the batches are 
sorted.
+            if counter >= 12000 {
+                return None;
+            }
+
+            if counter % 5 == 0 {
+                col_b_ascii = col_b_ascii + 2;
+            }
+
+            counter = counter + 1;
+            
+            // building col `a`
+            let mut values_vector: Vec<String> = Vec::new();
+            for _i in 1..=8192 {
+                // values_vector.push(rand::thread_rng().gen_range(1..=1000));
+                values_vector.push(String::from(Uuid::new_v4().to_string()));

Review Comment:
   This is likely quite an expensive way to generate data, it will be using 
secure random generator. There should be examples of sampling Alphanumeric 
within the codebase



##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
-    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, 
DictionaryArray};
+
+    use crate::physical_plan::streaming::PartitionStream;
+    use crate::physical_plan::stream::RecordBatchStreamAdapter;
+    use crate::datasource::{streaming::StreamingTable, TableProvider};
 
     use super::*;
 
+    fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, 
RecordBatch> {
+        let col_b_init_clone = col_b_init.clone();
+        futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut 
col_b_ascii)| async move {
+            // stop the stream at 20 batch now. 
+            // Need to figure out how all the columns in the batches are 
sorted.
+            if counter >= 12000 {
+                return None;
+            }
+
+            if counter % 5 == 0 {
+                col_b_ascii = col_b_ascii + 2;
+            }
+
+            counter = counter + 1;
+            
+            // building col `a`
+            let mut values_vector: Vec<String> = Vec::new();
+            for _i in 1..=8192 {
+                // values_vector.push(rand::thread_rng().gen_range(1..=1000));
+                values_vector.push(String::from(Uuid::new_v4().to_string()));
+            }
+            let values = StringArray::from(values_vector);
+
+            let mut keys_vector: Vec<i32> = Vec::new();
+            for _i in 1..=8192 {
+                keys_vector.push(rand::thread_rng().gen_range(0..8192));
+            }
+            let keys = Int32Array::from(keys_vector);
+            let col_a: ArrayRef = 
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(values)).unwrap());        
+
+            // building col `b`
+            let mut values: Vec<u32> = Vec::new();
+            for _i in 1..=8192 {
+                // let ascii_value = rand::thread_rng().gen_range(97..=110);
+                // values.push(String::from(from_u32(col_b_ascii).unwrap()));
+                values.push(col_b_ascii);
+                // values.sort();
+            }
+            let col_b: ArrayRef = Arc::new(UInt32Array::from(values));
+
+            // build a record batch out of col `a` and col `b`
+            let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", 
col_a), ("b", col_b)]).unwrap();
+            Some((batch, (counter, col_b_ascii)))
+        }).boxed()
+    }
+    
+    struct InfiniteStream {
+        schema: SchemaRef,
+        col_b_init: u32
+    }
+
+    impl PartitionStream for InfiniteStream {
+        fn schema(&self) -> &SchemaRef {
+            &self.schema
+        }
+    
+        fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream 
{
+            // We create an iterator from the record batches and map them into 
Ok values,
+            // converting the iterator into a futures::stream::Stream
+            Box::pin(RecordBatchStreamAdapter::new(
+                self.schema.clone(),
+                make_infinite_sorted_stream(&self.col_b_init).map(Ok)
+            ))
+        }
+    }
+
+    #[tokio::test]
+    async fn test_dict_merge_infinite() {
+        let session_ctx = SessionContext::new();
+        let task_ctx: Arc<TaskContext> = session_ctx.task_ctx();
+
+        let schema = SchemaRef::new(Schema::new(vec![
+            Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8)), false),
+            Field::new("b", DataType::UInt32, false),
+        ]));
+
+        let stream_1 = Arc::new(InfiniteStream {
+            schema: schema.clone(), col_b_init: 1
+        });
+
+        let stream_2 = Arc::new(InfiniteStream {
+            schema: schema.clone(), col_b_init: 2
+        });
+
+        println!("SortPreservingMergeExec result: ");
+
+        let sort = vec![
+            PhysicalSortExpr {
+                expr: col("b", &schema).unwrap(),
+                options: Default::default(),
+            },
+            PhysicalSortExpr {
+                expr: col("a", &schema).unwrap(),
+                options: Default::default(),
+            },
+        ];
+        
+        let provider = StreamingTable::try_new(schema, vec![stream_1, 
stream_2]).unwrap();
+
+        let plan = provider.scan(&session_ctx.state(), None, &[], 
None).await.unwrap();
+        let exec = Arc::new(SortPreservingMergeExec::new(sort, plan));
+        let mut stream = exec.execute(0, task_ctx).unwrap();
+        while let Some(batch) = stream.next().await {
+            println!("{}", 
arrow::util::pretty::pretty_format_batches(&[batch.unwrap().clone()])

Review Comment:
   ?



##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
-    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, 
DictionaryArray};
+
+    use crate::physical_plan::streaming::PartitionStream;
+    use crate::physical_plan::stream::RecordBatchStreamAdapter;
+    use crate::datasource::{streaming::StreamingTable, TableProvider};
 
     use super::*;
 
+    fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, 
RecordBatch> {
+        let col_b_init_clone = col_b_init.clone();
+        futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut 
col_b_ascii)| async move {
+            // stop the stream at 20 batch now. 
+            // Need to figure out how all the columns in the batches are 
sorted.
+            if counter >= 12000 {
+                return None;
+            }
+
+            if counter % 5 == 0 {
+                col_b_ascii = col_b_ascii + 2;
+            }
+
+            counter = counter + 1;
+            
+            // building col `a`
+            let mut values_vector: Vec<String> = Vec::new();
+            for _i in 1..=8192 {
+                // values_vector.push(rand::thread_rng().gen_range(1..=1000));
+                values_vector.push(String::from(Uuid::new_v4().to_string()));
+            }
+            let values = StringArray::from(values_vector);
+
+            let mut keys_vector: Vec<i32> = Vec::new();
+            for _i in 1..=8192 {
+                keys_vector.push(rand::thread_rng().gen_range(0..8192));
+            }
+            let keys = Int32Array::from(keys_vector);
+            let col_a: ArrayRef = 
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(values)).unwrap());        
+
+            // building col `b`
+            let mut values: Vec<u32> = Vec::new();
+            for _i in 1..=8192 {
+                // let ascii_value = rand::thread_rng().gen_range(97..=110);
+                // values.push(String::from(from_u32(col_b_ascii).unwrap()));
+                values.push(col_b_ascii);
+                // values.sort();
+            }
+            let col_b: ArrayRef = Arc::new(UInt32Array::from(values));
+
+            // build a record batch out of col `a` and col `b`
+            let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", 
col_a), ("b", col_b)]).unwrap();
+            Some((batch, (counter, col_b_ascii)))
+        }).boxed()
+    }
+    
+    struct InfiniteStream {
+        schema: SchemaRef,
+        col_b_init: u32
+    }
+
+    impl PartitionStream for InfiniteStream {
+        fn schema(&self) -> &SchemaRef {
+            &self.schema
+        }
+    
+        fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream 
{
+            // We create an iterator from the record batches and map them into 
Ok values,
+            // converting the iterator into a futures::stream::Stream
+            Box::pin(RecordBatchStreamAdapter::new(
+                self.schema.clone(),
+                make_infinite_sorted_stream(&self.col_b_init).map(Ok)
+            ))
+        }
+    }
+
+    #[tokio::test]
+    async fn test_dict_merge_infinite() {

Review Comment:
   What is this a test of?



##########
datafusion/core/src/physical_plan/mod.rs:
##########
@@ -402,6 +402,7 @@ pub mod union;
 pub mod unnest;
 pub mod values;
 pub mod windows;
+pub mod wrapper;

Review Comment:
   ```suggestion
   mod wrapper;
   ```
   
   I'm not sure this needs to be public



##########
datafusion/core/src/physical_plan/wrapper.rs:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use arrow::row::{SortField, Rows};
+use arrow::datatypes::DataType;
+use arrow::error::ArrowError;
+use arrow_array::*;
+use arrow_array::types::*;
+use arrow::row::RowConverter;
+
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+macro_rules! downcast_dict {
+    ($array:ident, $key:ident) => {{
+        $array
+            .as_any()
+            .downcast_ref::<DictionaryArray<$key>>()
+            .unwrap()
+    }};
+}
+
+#[derive(Debug)]
+pub struct CardinalityAwareRowConverter {
+    fields: Vec<SortField>,
+    inner: Option<RowConverter>,
+    done: bool,
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
+        Ok(Self {
+            fields: fields.clone(),
+            inner: None,
+            done: false,
+        })
+    }
+    
+    pub fn size(&self) -> usize {
+        return self.inner.as_ref().unwrap().size();
+    }
+
+    pub fn convert_rows(&self, rows: &Rows) -> Result<Vec<ArrayRef>, 
ArrowError> {
+        self.inner.as_ref().unwrap().convert_rows(rows)
+    }
+
+    pub fn convert_columns(
+        &mut self,
+        columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
+        
+        if !self.done {

Review Comment:
   If you made `fields: Option<Vec<SortField>>` you could drop self.done and 
this would become `if let Some(fields) = self.fields`
   
   Alternatively an explicit enum might be better
   
   
   ```
   pub struct CardinalityAwareRowConverter(ConverterState);
   
   enum ConverterState {
       Init(Vec<SortField>),
       Ready(RowConverter)
   }
   
   ```



##########
datafusion/core/src/physical_plan/wrapper.rs:
##########
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use arrow::row::{SortField, Rows};
+use arrow::datatypes::DataType;
+use arrow::error::ArrowError;
+use arrow_array::*;
+use arrow_array::types::*;
+use arrow::row::RowConverter;
+
+const LOW_CARDINALITY_THRESHOLD: usize = 10;
+
+macro_rules! downcast_dict {
+    ($array:ident, $key:ident) => {{
+        $array
+            .as_any()
+            .downcast_ref::<DictionaryArray<$key>>()
+            .unwrap()
+    }};
+}
+
+#[derive(Debug)]
+pub struct CardinalityAwareRowConverter {
+    fields: Vec<SortField>,
+    inner: Option<RowConverter>,
+    done: bool,
+}
+
+impl CardinalityAwareRowConverter {
+    pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
+        Ok(Self {
+            fields: fields.clone(),

Review Comment:
   This clone appears to be redundant



##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
-    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, 
DictionaryArray};
+
+    use crate::physical_plan::streaming::PartitionStream;
+    use crate::physical_plan::stream::RecordBatchStreamAdapter;
+    use crate::datasource::{streaming::StreamingTable, TableProvider};
 
     use super::*;
 
+    fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, 
RecordBatch> {

Review Comment:
   This doesn't appear to yield a sorted stream?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to