mingmwang commented on issue #1862:
URL:
https://github.com/apache/arrow-datafusion/issues/1862#issuecomment-1048526107
I'm working on it now, the PR is quite huge. Need to make execution plan
session config aware.
Below are some sample code snips:
In the ExecutionPlan trait, I add two methods, and each ExecutionPlan need
to add the session_id as its member.
```
pub trait ExecutionPlan: Debug + Send + Sync {
. . . . . . . . . .
/// Return the Session id associated with the execution plan.
fn session_id(&self) -> String;
/// Return the Session configuration associated with the execution plan.
fn session_config(&self) -> Arc<SessionConfig> {
let session_id = self.session_id();
let runtime = RuntimeEnv::global();
runtime.lookup_session_config(session_id)
}
}
```
And RuntimeEnv is changed to a global singleton structure. It was created
at the very beginning when Executor or Scheduler is firstly initialized in the
main() method. And RuntimeEnv does not need to pass down to plan's execution
path anymore.
```
pub static RUNTIME_ENV: OnceCell<RuntimeEnv> = OnceCell::new();
/// Execution runtime environment. This structure is a singleton for each
Scheduler/Executor instance.
pub struct RuntimeEnv {
/// Executor Id
pub executor_id: Option<String>,
/// Local Env
pub is_local: bool,
/// Runtime memory management
pub memory_manager: Arc<MemoryManager>,
/// Manage temporary files during query execution
pub disk_manager: Arc<DiskManager>,
/// Object Store that are registered within the Scheduler's or
Executors' Runtime
pub object_store_registry: Arc<ObjectStoreRegistry>,
/// DataFusion task contexts that are registered within the Executors'
Runtime
pub task_context_registry: Option<Arc<TaskContextRegistry>>,
/// DataFusion session contexts that are registered within the
Scheduler's Runtime
pub session_context_registry: Option<Arc<SessionContextRegistry>>,
}
impl RuntimeEnv {
/// Create an executor env based on configuration
pub fn newExecutorEnv(config: RuntimeConfig, executor_id: String) ->
Result<Self> {
let RuntimeConfig {
memory_manager,
disk_manager,
} = config;
Ok(Self {
executor_id: Some(executor_id),
is_local: false,
memory_manager: MemoryManager::new(memory_manager),
disk_manager: DiskManager::try_new(disk_manager)?,
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
task_context_registry:
Some(Arc::new(TaskContextRegistry::new())),
session_context_registry: None,
})
}
/// Create a scheduler env based on configuration
pub fn newSchedulerEnv(config: RuntimeConfig) -> Result<Self> {
let RuntimeConfig {
memory_manager,
disk_manager,
} = config;
Ok(Self {
executor_id: None,
is_local: false,
memory_manager: MemoryManager::new(memory_manager),
disk_manager: DiskManager::try_new(disk_manager)?,
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
task_context_registry: None,
session_context_registry:
Some(Arc::new(SessionContextRegistry::new())),
})
}
/// Create a local env based on configuration
pub fn newLocalEnv(config: RuntimeConfig) -> Result<Self> {
let RuntimeConfig {
memory_manager,
disk_manager,
} = config;
Ok(Self {
executor_id: None,
is_local: true,
memory_manager: MemoryManager::new(memory_manager),
disk_manager: DiskManager::try_new(disk_manager)?,
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
task_context_registry: None,
session_context_registry:
Some(Arc::new(SessionContextRegistry::new())),
})
}
pub fn global() -> &'static RuntimeEnv {
RUNTIME_ENV.get().expect("RuntimeEnv is not initialized")
}
pub fn isScheduler(&self) -> bool {
(!self.is_local) && self.executor_id.is_none()
}
/// Retrieves a `SessionConfig` by session_id
pub fn lookup_session_config(&self, session_id: String) ->
Arc<SessionConfig> {
if self.isScheduler() {
let session_conf = self
.lookup_session(session_id.as_str())
.expect("SessionContext doesn't exist")
.state
.lock()
.clone()
.config;
Arc::new(session_conf)
} else {
self.config_from_task_context(session_id)
}
}
}
```
The new SessionContexts are registered to Scheduler's RuntimeEnv. And Task
contexts are registered to Executors' RuntimeEnv. SessionContext contains the
internal SessionState with the planners, optimizers, udf/udaf, SessionConfig
etc.
SessionContext only exists in the Scheduler, because it is related to
planning and optimization.
In the TaskDefinition proto, add session_id and props name-value pairs so
that session_id and configurations can
propagate to the executor side and recreate the Physical ExecutionPlan.
```
message TaskDefinition {
PartitionId task_id = 1;
bytes plan = 2;
// Output partition for shuffle writer
PhysicalHashRepartition output_partitioning = 3;
string session_id = 4;
repeated KeyValuePair props = 5;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]