JayjeetAtGithub commented on code in PR #7401:
URL: https://github.com/apache/arrow-datafusion/pull/7401#discussion_r1310643106
##########
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs:
##########
@@ -286,10 +290,123 @@ mod tests {
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
use crate::test::{self, assert_is_pending};
use crate::{assert_batches_eq, test_util};
- use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
+ use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray,
DictionaryArray};
+
+ use crate::physical_plan::streaming::PartitionStream;
+ use crate::physical_plan::stream::RecordBatchStreamAdapter;
+ use crate::datasource::{streaming::StreamingTable, TableProvider};
use super::*;
+ fn make_infinite_sorted_stream(col_b_init: &u32) -> BoxStream<'static,
RecordBatch> {
+ let col_b_init_clone = col_b_init.clone();
+ futures::stream::unfold((0, col_b_init_clone), move |(mut counter, mut
col_b_ascii)| async move {
+ // stop the stream at 20 batch now.
+ // Need to figure out how all the columns in the batches are
sorted.
+ if counter >= 12000 {
+ return None;
+ }
+
+ if counter % 5 == 0 {
+ col_b_ascii = col_b_ascii + 2;
+ }
+
+ counter = counter + 1;
+
+ // building col `a`
+ let mut values_vector: Vec<String> = Vec::new();
+ for _i in 1..=8192 {
+ // values_vector.push(rand::thread_rng().gen_range(1..=1000));
+ values_vector.push(String::from(Uuid::new_v4().to_string()));
+ }
+ let values = StringArray::from(values_vector);
+
+ let mut keys_vector: Vec<i32> = Vec::new();
+ for _i in 1..=8192 {
+ keys_vector.push(rand::thread_rng().gen_range(0..8192));
+ }
+ let keys = Int32Array::from(keys_vector);
+ let col_a: ArrayRef =
Arc::new(DictionaryArray::<Int32Type>::try_new(keys,
Arc::new(values)).unwrap());
+
+ // building col `b`
+ let mut values: Vec<u32> = Vec::new();
+ for _i in 1..=8192 {
+ // let ascii_value = rand::thread_rng().gen_range(97..=110);
+ // values.push(String::from(from_u32(col_b_ascii).unwrap()));
+ values.push(col_b_ascii);
+ // values.sort();
+ }
+ let col_b: ArrayRef = Arc::new(UInt32Array::from(values));
+
+ // build a record batch out of col `a` and col `b`
+ let batch: RecordBatch = RecordBatch::try_from_iter(vec![("a",
col_a), ("b", col_b)]).unwrap();
+ Some((batch, (counter, col_b_ascii)))
+ }).boxed()
+ }
+
+ struct InfiniteStream {
+ schema: SchemaRef,
+ col_b_init: u32
+ }
+
+ impl PartitionStream for InfiniteStream {
+ fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream
{
+ // We create an iterator from the record batches and map them into
Ok values,
+ // converting the iterator into a futures::stream::Stream
+ Box::pin(RecordBatchStreamAdapter::new(
+ self.schema.clone(),
+ make_infinite_sorted_stream(&self.col_b_init).map(Ok)
+ ))
+ }
+ }
+
+ #[tokio::test]
+ async fn test_dict_merge_infinite() {
Review Comment:
This will be removed. For now, this is just a rough test to check if memory
is going out of bounds.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]