wj-stack opened a new issue, #12532:
URL: https://github.com/apache/datafusion/issues/12532

   ### Is your feature request related to a problem or challenge?
   
   I currently have two data sources, one stored in Parquet format and the 
other in memory. I need to implement a scan function. I tried using UnionExec, 
but it's obviously not working, especially when using aggregation functions 
like count. Maybe I should use SortPreservingMergeExec, but there are too few 
examples of this function. I would appreciate it if you could add an example 
that includes multiple data sources, as these sources may contain duplicate 
data, and I would be happy to see an example of deduplication based on multiple 
fields.
   ```
       async fn scan(
           &self,
           state: &dyn Session,
           projection: Option<&Vec<usize>>,
           filters: &[Expr],
           limit: Option<usize>,
       ) -> Result<Arc<dyn ExecutionPlan>> {
           // convert filters like [`a = 1`, `b = 2`] to a single filter like 
`a = 1 AND b = 2`
           let predicate = self.filters_to_predicate(state, filters)?;
   
           // Now, we invoke the analysis code to perform the range analysis
           let df_schema = DFSchema::try_from(self.schema())?;
   
           let boundaries = ExprBoundaries::try_new_unbounded(&self.schema())?;
   
           let analysis_result = analyze(
               &predicate,
               AnalysisContext::new(boundaries),
               df_schema.as_ref(),
           )?;
   
           // In this example, we use the PruningPredicate's literal guarantees 
to
           // analyze the predicate. In a real system, using
           // `PruningPredicate::prune` would likely be easier to do.
           let pruning_predicate =
               PruningPredicate::try_new(Arc::clone(&predicate), 
self.schema().clone())?;
   
           debug!("pruning_predicate:{:?}", pruning_predicate);
   
           // The PruningPredicate's guarantees must all be satisfied in order 
for
           // the predicate to possibly evaluate to true.
           let guarantees = pruning_predicate.literal_guarantees();
   
           debug!("guarantees:{:?}", guarantees);
   
           let object_store_url = ObjectStoreUrl::parse("file://")?;
           let mut file_scan_config = FileScanConfig::new(object_store_url, 
self.schema())
               .with_projection(projection.cloned())
               .with_limit(limit);
   
           let mut points = vec![];
   
           for expr in &analysis_result.boundaries {
               if expr.column.name() == "show_time" {
                   let lower = 
expr.interval.lower().clone().to_array().unwrap();
                   let lower = lower
                       .as_any()
                       .downcast_ref::<TimestampMillisecondArray>()
                       .unwrap();
   
                   let upper = 
expr.interval.upper().clone().to_array().unwrap();
                   let upper = upper
                       .as_any()
                       .downcast_ref::<TimestampMillisecondArray>()
                       .unwrap();
   
                   debug!(
                       "{:?} lower:{:?} upper:{:?}",
                       expr.column,
                       lower.value(0),
                       upper.value(0)
                   );
   
                   let start = 
DateTime::<Utc>::from_timestamp_millis(lower.value(0)).unwrap();
                   let end = 
DateTime::<Utc>::from_timestamp_millis(upper.value(0)).unwrap();
   
                   let p = self.query_with_time(start, end).await;
   
                   if p.len() != 0 {
                       let addr = { self.fields.read().await.clone() };
                       let additional = { self.additional.read().await.clone() 
};
   
                       let batch = create_record_batch(&p, &addr, 
&additional).unwrap();
                       points.push(batch);
                   }
   
                   let dirs = self.get_dir_by_time(start, 
end).unwrap_or_default();
   
                   info!("dirs: {:?}  {:?} {:?}", start, end, dirs);
   
                   for s in dirs {
                       let mut files = list_files_in_directory(&s).unwrap();
                       files.reverse();
   
                       let mut v = vec![];
   
                       for s in files {
                           if s.extension().unwrap() == "parquet" {
                               let f = self
                                   .files
                                   .optionally_get_with(format!("{:?}", s), 
async {
                                       let f = add_file(Path::new(&s)).unwrap();
                                       Some((String::from(f.0), f.1))
                                   })
                                   .await
                                   .unwrap();
   
                               v.push(PartitionedFile::new(f.0.clone(), f.1));
                           }
                       }
                       file_scan_config = file_scan_config.with_file_group(v);
                   }
   
                   break;
               }
           }
   
           let exec: Arc<ParquetExec> = ParquetExec::builder(file_scan_config)
               .with_predicate(predicate)
               .build_arc();
   
           // count error
   
           let memory =
               MemoryExec::try_new(&vec![points], self.schema(), 
projection.cloned()).unwrap();
   
           let union = Arc::new(UnionExec::new(vec![Arc::new(memory), exec]));
   
           Ok(union)
       }
   ```
   
   
   ### Describe the solution you'd like
   
   _No response_
   
   ### Describe alternatives you've considered
   
   _No response_
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to