mingmwang commented on issue #1862:
URL: 
https://github.com/apache/arrow-datafusion/issues/1862#issuecomment-1048526107


   I'm working on it now, the PR is quite huge. Need to make execution plan 
session config aware.
   Below are some sample code snips:
   
   In the ExecutionPlan trait, I add two methods, and each ExecutionPlan need 
to add the session_id as its member.
   
   ```
   pub trait ExecutionPlan: Debug + Send + Sync  {
       . . . . . . . . . .
      /// Return the Session id associated with the execution plan.
       fn session_id(&self) -> String;
   
      /// Return the Session configuration associated with the execution plan.
       fn session_config(&self) -> Arc<SessionConfig> {
           let session_id = self.session_id();
           let runtime = RuntimeEnv::global();
           runtime.lookup_session_config(session_id)
       }
   
   }
   
   ```
     
   And  RuntimeEnv is changed to a global singleton structure. It was created 
at the very beginning when Executor or Scheduler is firstly initialized in the 
main() method. And RuntimeEnv does not need to pass down to plan's execution 
path anymore.
   
   ```
   pub static RUNTIME_ENV: OnceCell<RuntimeEnv> = OnceCell::new();
   
   /// Execution runtime environment. This structure is a singleton for each 
Scheduler/Executor instance.
   pub struct RuntimeEnv {
       /// Executor Id
       pub executor_id: Option<String>,
       /// Local Env
       pub is_local: bool,
       /// Runtime memory management
       pub memory_manager: Arc<MemoryManager>,
       /// Manage temporary files during query execution
       pub disk_manager: Arc<DiskManager>,
       /// Object Store that are registered within the Scheduler's or 
Executors' Runtime
       pub object_store_registry: Arc<ObjectStoreRegistry>,
       /// DataFusion task contexts that are registered within the Executors' 
Runtime
       pub task_context_registry: Option<Arc<TaskContextRegistry>>,
       /// DataFusion session contexts that are registered within the 
Scheduler's Runtime
       pub session_context_registry: Option<Arc<SessionContextRegistry>>,
   }
   
   impl RuntimeEnv {
       /// Create an executor env based on configuration
       pub fn newExecutorEnv(config: RuntimeConfig, executor_id: String) -> 
Result<Self> {
           let RuntimeConfig {
               memory_manager,
               disk_manager,
           } = config;
           Ok(Self {
               executor_id: Some(executor_id),
               is_local: false,
               memory_manager: MemoryManager::new(memory_manager),
               disk_manager: DiskManager::try_new(disk_manager)?,
               object_store_registry: Arc::new(ObjectStoreRegistry::new()),
               task_context_registry: 
Some(Arc::new(TaskContextRegistry::new())),
               session_context_registry: None,
           })
       }
   
       /// Create a scheduler env based on configuration
       pub fn newSchedulerEnv(config: RuntimeConfig) -> Result<Self> {
           let RuntimeConfig {
               memory_manager,
               disk_manager,
           } = config;
           Ok(Self {
               executor_id: None,
               is_local: false,
               memory_manager: MemoryManager::new(memory_manager),
               disk_manager: DiskManager::try_new(disk_manager)?,
               object_store_registry: Arc::new(ObjectStoreRegistry::new()),
               task_context_registry: None,
               session_context_registry: 
Some(Arc::new(SessionContextRegistry::new())),
           })
       }
   
       /// Create a local env based on configuration
       pub fn newLocalEnv(config: RuntimeConfig) -> Result<Self> {
           let RuntimeConfig {
               memory_manager,
               disk_manager,
           } = config;
           Ok(Self {
               executor_id: None,
               is_local: true,
               memory_manager: MemoryManager::new(memory_manager),
               disk_manager: DiskManager::try_new(disk_manager)?,
               object_store_registry: Arc::new(ObjectStoreRegistry::new()),
               task_context_registry: None,
               session_context_registry: 
Some(Arc::new(SessionContextRegistry::new())),
           })
       }
   
       pub fn global() -> &'static RuntimeEnv {
           RUNTIME_ENV.get().expect("RuntimeEnv is not initialized")
       }
   
       pub fn isScheduler(&self) -> bool {
           (!self.is_local) && self.executor_id.is_none()
       }
   
      /// Retrieves a `SessionConfig` by session_id
       pub fn lookup_session_config(&self, session_id: String) -> 
Arc<SessionConfig> {
           if self.isScheduler() {
               let session_conf = self
                   .lookup_session(session_id.as_str())
                   .expect("SessionContext doesn't exist")
                   .state
                   .lock()
                   .clone()
                   .config;
               Arc::new(session_conf)
           } else {
               self.config_from_task_context(session_id)
           }
       }
   }
   ```
   The new SessionContexts are registered to Scheduler's RuntimeEnv. And Task 
contexts are registered to Executors' RuntimeEnv.   SessionContext contains the 
internal SessionState with the planners, optimizers, udf/udaf, SessionConfig 
etc.
   SessionContext only exists in the Scheduler, because it is related to 
planning and optimization.
   
   In the TaskDefinition proto, add session_id and props name-value pairs so 
that session_id and configurations can
   propagate to the executor side and recreate the Physical  ExecutionPlan. 
   
   ```
   message TaskDefinition {
     PartitionId task_id = 1;
     bytes plan = 2;
     // Output partition for shuffle writer
     PhysicalHashRepartition output_partitioning = 3;
     string session_id = 4;
     repeated KeyValuePair props = 5;
   }
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to