ameyc commented on PR #12186:
URL: https://github.com/apache/datafusion/pull/12186#issuecomment-2347328029

   @ozankabak would love any pointers. this code is admittedly a quick draft. 
so we pass in a hashmap and recursivesly traverse the tree.
   
   ```
   pub struct NodeIdAnnotator {
       next_id: usize,
   }
   
   impl NodeIdAnnotator {
       pub fn new() -> Self {
           NodeIdAnnotator { next_id: 0 }
       }
   
       pub fn next_node_id(&mut self) -> usize {
           let node_id = self.next_id;
           self.next_id += 1;
           node_id
       }
   }
   
   pub fn annotate_node_id_for_execution_plan(
       plan: &Arc<dyn ExecutionPlan>,
       annotator: &mut NodeIdAnnotator,
       plan_map: &mut HashMap<usize, usize>,
   ) {
       for child in plan.children() {
           annotate_node_id_for_execution_plan(child, annotator, plan_map);
       }
       let node_id = annotator.next_node_id();
       let addr = Arc::as_ptr(plan) as *const () as usize;
       plan_map.insert(addr, node_id);
   }
   ```
   
   then when executing the plan, we need to actually annotate this and set a 
global hash map singleton.
   
   ```
       pub async fn print_stream(self) -> Result<()> {
           let plan = self.df.as_ref().clone().create_physical_plan().await?;
           let mut node_id_map: HashMap<usize, usize> = HashMap::new();
           let mut annotator = NodeIdAnnotator::new();
           annotate_node_id_for_execution_plan(&plan, &mut annotator, &mut 
node_id_map);
           for (key, value) in node_id_map.iter() {
               debug!("Node {}, Id {}", key, value);
           }
           let task_ctx = self.df.as_ref().clone().task_ctx();
           set_global_node_id_hash_map(&node_id_map);
           let mut stream: SendableRecordBatchStream = execute_stream(plan, 
Arc::new(task_ctx))?;
   ```
   now when creating a Stream, we do need to pass a reference to the 
ExecutionPlan it is tied to in order for it to figure out the 
channel_tag/checkpoint_tag it is supposed to use to coordinate checkpoints.
   
   Btw the annotation outputted is something like -
   
   ```
   [2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783968320, Id 0
   [2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783975296, Id 1
   [2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783964912, Id 2
   [2024-09-12T21:55:33Z DEBUG denormalized::datastream] Node 5783965296, Id 3
   ```
   
   In contrast this PR annotates the node_ids during `create_physical_plan` 
phase and then you have the node_id at Stream creation time without needing a 
global lookup. The corresponding `print_physical_plan()` now gives you the node 
id as well.
   
   ```
   FilterExec: max@3 > 113, **node_id=3**
     StreamingWindowExec: mode=Single, gby=[sensor_name@2 as sensor_name], 
aggr=[count, min, max, average], window_type=[Tumbling(1s)], **node_id=2**
       RepartitionExec: partitioning=Hash([sensor_name@2], 14), 
input_partitions=1, **node_id=1**
         DenormalizedStreamingTableExec: partition_sizes=1, 
projection=[occurred_at_ms, reading, sensor_name, 
_streaming_internal_metadata], infinite_source=true, **node_id=0**
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to