milenkovicm opened a new pull request, #1108:
URL: https://github.com/apache/datafusion-ballista/pull/1108

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   Closes #1104.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   There are two major changes proposed with this PR:
   
   1. `BallistaConfig` has been trimmed down, removing configuration options 
which map one-to-one to datafusion options.
   2. `BallistaConfig` has been integrated with `SessionConfig` and propagated 
across cluster, from client to scheduler to executors. Uses can provide their 
own configuration which will be propagated, as well.
   3. Example how to configure `ObjectStoreRegistry`
   
   A specific `BallistaConfig` can be set/accessed with methods provided by 
`SessionConfigExt`:
   
   ```rust
   let session_config = SessionConfig::new_with_ballista()
       .with_information_schema(true)
       .with_ballista_job_name("Super Cool Ballista App");
   ```
   
   as `BallistaConfig` has been integrated with `SessionConfig` infrastructure 
it can be changed with SQL `SET`:
   
   ```rest
   ctx.sql("SET ballista.job.name = 'Super Cool Ballista 
App'").await?.show().await?;    
   ```
   
   `BallistaConfig` will automatically get propagated around cluster. How to 
propagate user specific configuration we provide
   full example in tests [...]
   
   Short overview:
   
   ```rust
   // Setting up configuration producer
   //
   // configuration producer registers user defined config extension
   // S3Option with relevant S3 configuration
   let config_producer = Arc::new(|| {
       SessionConfig::new_with_ballista()
           .with_information_schema(true)
           .with_option_extension(S3Options::default())
   });
   // Setting up runtime producer
   //
   // Runtime producer creates object store registry
   // which can create object store connecter based on
   // S3Option configuration.
   let runtime_producer: RuntimeProducer =
       Arc::new(|session_config: &SessionConfig| {
           let s3options = session_config
               .options()
               .extensions
               .get::<S3Options>()
               .ok_or(DataFusionError::Configuration(
                   "S3 Options not set".to_string(),
               ))?;
   
           let config = 
RuntimeConfig::new().with_object_store_registry(Arc::new(
               // our custom ObjectStoreRegistry will use shared configuration
               CustomObjectStoreRegistry::new(s3options.clone()),
           ));
   
           Ok(Arc::new(RuntimeEnv::new(config)?))
       });
   
   // Session builder creates SessionState
   //
   // which is configured using runtime and configuration producer,
   // producing same runtime environment, and providing same
   // object store registry.
   
   let session_builder = Arc::new(produce_state);
   let state = session_builder(config_producer());
   
   // setting up ballista cluster with new runtime, configuration, and session 
state producers
   //
   // this step is important as it setups scheduler and executor to handle 
   // custom configuration 
   let (host, port) = crate::common::setup_test_cluster_with_builders(
       config_producer,
       runtime_producer,
       session_builder,
   )
   .await;
   
   // establishing cluster connection,
   let ctx: SessionContext = 
SessionContext::remote_with_state("df://localhost:50050", state).await?;
   
   // setting up relevant S3 options
   ctx.sql(&format!("SET s3.access_key_id = '{}'", ACCESS_KEY_ID))
       .await?
       .show()
       .await?;
   ctx.sql(&format!("SET s3.secret_access_key = '{}'", SECRET_KEY))
       .await?
       .show()
       .await?;
   ```
   
   where `S3Options` is defined like:
   
   ```rust
   pub struct S3Options {
       config: Arc<RwLock<S3RegistryConfiguration>>,
   }
   
   impl ExtensionOptions for S3Options {
       fn as_any(&self) -> &dyn Any {
           self
       }
   
       fn as_any_mut(&mut self) -> &mut dyn Any {
           self
       }
   
       fn cloned(&self) -> Box<dyn ExtensionOptions> {
           Box::new(self.clone())
       }
   
       fn set(&mut self, key: &str, value: &str) -> Result<()> {
           // omitted
           Ok(())
       }
   
       fn entries(&self) -> Vec<ConfigEntry> {
           // omitted 
       }
   }
   
   impl ConfigExtension for S3Options {
       const PREFIX: &'static str = "s3";
   }
   #[derive(Default, Debug, Clone)]
   pub struct S3RegistryConfiguration {
       
       pub access_key_id: Option<String>,
       pub secret_access_key: Option<String>,
       pub session_token: Option<String>,
       pub region: Option<String>,
       pub endpoint: Option<String>,
       pub allow_http: Option<bool>,
   }
   ```
   
   Configuration values could be list with:
   
   ```rust
   ctx.sql("select name, value from information_schema.df_settings where name 
like 's3.%'").await?.show().await?;
   ```
   
   which should return something like:
   
   ```text
   +----------------------+------------------------+
   | name                 | value                  |
   +----------------------+------------------------+
   | s3.access_key_id     | MINIO                  |
   | s3.secret_access_key | MINIOMINIO             |
   | s3.session_token     |                        |
   | s3.region            |                        |
   | s3.endpoint          | http://localhost:55001 |
   | s3.allow_http        | true                   |
   +----------------------+------------------------+
   ```
   
   For follow up:
   
   - further cleanup is needed for `BallistaConfig` once we remove 
`BallistaContext`
   - this change makes `object_store` module and features obsolete, they will 
be removed once we remove `BallistaContext`
   
   
   # Are there any user-facing changes?
   
   `BallistaConfig` looses most of its configuration options 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to