milenkovicm opened a new pull request, #1108: URL: https://github.com/apache/datafusion-ballista/pull/1108
# Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Closes #1104. # Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> # What changes are included in this PR? There are two major changes proposed with this PR: 1. `BallistaConfig` has been trimmed down, removing configuration options which map one-to-one to datafusion options. 2. `BallistaConfig` has been integrated with `SessionConfig` and propagated across cluster, from client to scheduler to executors. Uses can provide their own configuration which will be propagated, as well. 3. Example how to configure `ObjectStoreRegistry` A specific `BallistaConfig` can be set/accessed with methods provided by `SessionConfigExt`: ```rust let session_config = SessionConfig::new_with_ballista() .with_information_schema(true) .with_ballista_job_name("Super Cool Ballista App"); ``` as `BallistaConfig` has been integrated with `SessionConfig` infrastructure it can be changed with SQL `SET`: ```rest ctx.sql("SET ballista.job.name = 'Super Cool Ballista App'").await?.show().await?; ``` `BallistaConfig` will automatically get propagated around cluster. How to propagate user specific configuration we provide full example in tests [...] Short overview: ```rust // Setting up configuration producer // // configuration producer registers user defined config extension // S3Option with relevant S3 configuration let config_producer = Arc::new(|| { SessionConfig::new_with_ballista() .with_information_schema(true) .with_option_extension(S3Options::default()) }); // Setting up runtime producer // // Runtime producer creates object store registry // which can create object store connecter based on // S3Option configuration. let runtime_producer: RuntimeProducer = Arc::new(|session_config: &SessionConfig| { let s3options = session_config .options() .extensions .get::<S3Options>() .ok_or(DataFusionError::Configuration( "S3 Options not set".to_string(), ))?; let config = RuntimeConfig::new().with_object_store_registry(Arc::new( // our custom ObjectStoreRegistry will use shared configuration CustomObjectStoreRegistry::new(s3options.clone()), )); Ok(Arc::new(RuntimeEnv::new(config)?)) }); // Session builder creates SessionState // // which is configured using runtime and configuration producer, // producing same runtime environment, and providing same // object store registry. let session_builder = Arc::new(produce_state); let state = session_builder(config_producer()); // setting up ballista cluster with new runtime, configuration, and session state producers // // this step is important as it setups scheduler and executor to handle // custom configuration let (host, port) = crate::common::setup_test_cluster_with_builders( config_producer, runtime_producer, session_builder, ) .await; // establishing cluster connection, let ctx: SessionContext = SessionContext::remote_with_state("df://localhost:50050", state).await?; // setting up relevant S3 options ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID)) .await? .show() .await?; ctx.sql(&format!("SET s3.secret_access_key = '{}'", SECRET_KEY)) .await? .show() .await?; ``` where `S3Options` is defined like: ```rust pub struct S3Options { config: Arc<RwLock<S3RegistryConfiguration>>, } impl ExtensionOptions for S3Options { fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self } fn cloned(&self) -> Box<dyn ExtensionOptions> { Box::new(self.clone()) } fn set(&mut self, key: &str, value: &str) -> Result<()> { // omitted Ok(()) } fn entries(&self) -> Vec<ConfigEntry> { // omitted } } impl ConfigExtension for S3Options { const PREFIX: &'static str = "s3"; } #[derive(Default, Debug, Clone)] pub struct S3RegistryConfiguration { pub access_key_id: Option<String>, pub secret_access_key: Option<String>, pub session_token: Option<String>, pub region: Option<String>, pub endpoint: Option<String>, pub allow_http: Option<bool>, } ``` Configuration values could be list with: ```rust ctx.sql("select name, value from information_schema.df_settings where name like 's3.%'").await?.show().await?; ``` which should return something like: ```text +----------------------+------------------------+ | name | value | +----------------------+------------------------+ | s3.access_key_id | MINIO | | s3.secret_access_key | MINIOMINIO | | s3.session_token | | | s3.region | | | s3.endpoint | http://localhost:55001 | | s3.allow_http | true | +----------------------+------------------------+ ``` For follow up: - further cleanup is needed for `BallistaConfig` once we remove `BallistaContext` - this change makes `object_store` module and features obsolete, they will be removed once we remove `BallistaContext` # Are there any user-facing changes? `BallistaConfig` looses most of its configuration options -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org