JayjeetAtGithub commented on code in PR #7401:
URL: https://github.com/apache/arrow-datafusion/pull/7401#discussion_r1313468763


##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use crate::{assert_batches_eq, test_util};
-    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+    use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray, 
DictionaryArray};
+
+    use crate::physical_plan::streaming::PartitionStream;
+    use crate::physical_plan::stream::RecordBatchStreamAdapter;
+    use crate::datasource::{streaming::StreamingTable, TableProvider};
 
     use super::*;
 
+    fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static, 
RecordBatch> {
+        let col_b_init_clone = col_b_init.clone();
+        futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut 
col_b_ascii)| async move {
+            // stop the stream at 20 batch now. 
+            // Need to figure out how all the columns in the batches are 
sorted.
+            if counter >= 12000 {
+                return None;
+            }
+
+            if counter % 5 == 0 {
+                col_b_ascii = col_b_ascii + 2;
+            }
+
+            counter = counter + 1;
+            
+            // building col `a`
+            let mut values_vector: Vec<String> = Vec::new();
+            for _i in 1..=8192 {
+                // values_vector.push(rand::thread_rng().gen_range(1..=1000));
+                values_vector.push(String::from(Uuid::new_v4().to_string()));
+            }
+            let values = StringArray::from(values_vector);
+
+            let mut keys_vector: Vec<i32> = Vec::new();
+            for _i in 1..=8192 {
+                keys_vector.push(rand::thread_rng().gen_range(0..8192));
+            }
+            let keys = Int32Array::from(keys_vector);
+            let col_a: ArrayRef = 
Arc::new(DictionaryArray::<Int32Type>::try_new(keys, 
Arc::new(values)).unwrap());        
+
+            // building col `b`
+            let mut values: Vec<u32> = Vec::new();
+            for _i in 1..=8192 {
+                // let ascii_value = rand::thread_rng().gen_range(97..=110);
+                // values.push(String::from(from_u32(col_b_ascii).unwrap()));
+                values.push(col_b_ascii);
+                // values.sort();
+            }
+            let col_b: ArrayRef = Arc::new(UInt32Array::from(values));
+
+            // build a record batch out of col `a` and col `b`
+            let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a", 
col_a), ("b", col_b)]).unwrap();
+            Some((batch, (counter, col_b_ascii)))
+        }).boxed()
+    }
+    
+    struct InfiniteStream {
+        schema: SchemaRef,
+        col_b_init: u32
+    }
+
+    impl PartitionStream for InfiniteStream {
+        fn schema(&self) -> &SchemaRef {
+            &self.schema
+        }
+    
+        fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream 
{
+            // We create an iterator from the record batches and map them into 
Ok values,
+            // converting the iterator into a futures::stream::Stream
+            Box::pin(RecordBatchStreamAdapter::new(
+                self.schema.clone(),
+                make_infinite_sorted_stream(&self.col_b_init).map(Ok)
+            ))
+        }
+    }
+
+    #[tokio::test]
+    async fn test_dict_merge_infinite() {

Review Comment:
   For now, I have added `#[ignore]` to this test, so that it does not get run 
in CI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to