JayjeetAtGithub commented on code in PR #7401:
URL: https://github.com/apache/arrow-datafusion/pull/7401#discussion_r1310674106


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
-    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, 
DictionaryArray};
+
+    use crate::physical_plan::streaming::PartitionStream;
+    use crate::physical_plan::stream::RecordBatchStreamAdapter;
+    use crate::datasource::{streaming::StreamingTable, TableProvider};
 
     use super::*;
 
+    fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, 
RecordBatch> {
+        let col_b_init_clone = col_b_init.clone();
+        futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut 
col_b_ascii)| async move {
+            // stop the stream at 20 batch now. 
+            // Need to figure out how all the columns in the batches are 
sorted.
+            if counter >= 12000 {
+                return None;
+            }
+
+            if counter % 5 == 0 {
+                col_b_ascii = col_b_ascii + 2;
+            }
+
+            counter = counter + 1;
+            
+            // building col `a`
+            let mut values_vector: Vec<String> = Vec::new();
+            for _i in 1..=8192 {
+                // values_vector.push(rand::thread_rng().gen_range(1..=1000));
+                values_vector.push(String::from(Uuid::new_v4().to_string()));
+            }
+            let values = StringArray::from(values_vector);
+
+            let mut keys_vector: Vec<i32> = Vec::new();
+            for _i in 1..=8192 {
+                keys_vector.push(rand::thread_rng().gen_range(0..8192));
+            }
+            let keys = Int32Array::from(keys_vector);
+            let col_a: ArrayRef = 
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(values)).unwrap());        
+
+            // building col `b`
+            let mut values: Vec<u32> = Vec::new();
+            for _i in 1..=8192 {
+                // let ascii_value = rand::thread_rng().gen_range(97..=110);
+                // values.push(String::from(from_u32(col_b_ascii).unwrap()));
+                values.push(col_b_ascii);
+                // values.sort();
+            }
+            let col_b: ArrayRef = Arc::new(UInt32Array::from(values));
+
+            // build a record batch out of col `a` and col `b`
+            let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", 
col_a), ("b", col_b)]).unwrap();
+            Some((batch, (counter, col_b_ascii)))
+        }).boxed()
+    }
+    
+    struct InfiniteStream {
+        schema: SchemaRef,
+        col_b_init: u32
+    }
+
+    impl PartitionStream for InfiniteStream {
+        fn schema(&self) -> &SchemaRef {
+            &self.schema
+        }
+    
+        fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream 
{
+            // We create an iterator from the record batches and map them into 
Ok values,
+            // converting the iterator into a futures::stream::Stream
+            Box::pin(RecordBatchStreamAdapter::new(
+                self.schema.clone(),
+                make_infinite_sorted_stream(&self.col_b_init).map(Ok)
+            ))
+        }
+    }
+
+    #[tokio::test]
+    async fn test_dict_merge_infinite() {

Review Comment:
   I would work on adding a test to check if the `CardinalityAwareRowConverter` 
can switch the `preserve_dictionaries` according to the row cardinality. But do 
you have any suggestion to put the current e2e test to check the memory usage ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to