This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 489e8076 Support sled path in config file (#80)
489e8076 is described below
commit 489e807609ae277a4320694e0fce35c0740a91ec
Author: Yang Jiang <[email protected]>
AuthorDate: Wed Jul 6 14:56:42 2022 +0800
Support sled path in config file (#80)
---
ballista/rust/core/src/config.rs | 20 ++++++++++----------
ballista/rust/scheduler/scheduler_config_spec.toml | 6 ++++++
ballista/rust/scheduler/src/main.rs | 17 ++++++++++++-----
3 files changed, 28 insertions(+), 15 deletions(-)
diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs
index 1ff02c16..b423fe18 100644
--- a/ballista/rust/core/src/config.rs
+++ b/ballista/rust/core/src/config.rs
@@ -115,9 +115,9 @@ impl BallistaConfig {
for (name, entry) in &supported_entries {
if let Some(v) = settings.get(name) {
// validate that we can parse the user-supplied value
- let _ = Self::parse_value(v.as_str(),
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to
parse user-supplied value '{}' for configuration setting '{}': {}", name, v,
e)))?;
+ Self::parse_value(v.as_str(),
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to
parse user-supplied value '{}' for configuration setting '{}': {}", name, v,
e)))?;
} else if let Some(v) = entry.default_value.clone() {
- let _ = Self::parse_value(v.as_str(),
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to
parse default value '{}' for configuration setting '{}': {}", name, v, e)))?;
+ Self::parse_value(v.as_str(),
entry._data_type.clone()).map_err(|e| BallistaError::General(format!("Failed to
parse default value '{}' for configuration setting '{}': {}", name, v, e)))?;
} else {
return Err(BallistaError::General(format!(
"No value specified for mandatory configuration setting
'{}'",
@@ -156,8 +156,8 @@ impl BallistaConfig {
pub fn valid_entries() -> HashMap<String, ConfigEntry> {
let entries = vec![
ConfigEntry::new(BALLISTA_DEFAULT_SHUFFLE_PARTITIONS.to_string(),
- "Sets the default number of partitions to create when
repartitioning query stages".to_string(),
- DataType::UInt16, Some("2".to_string())),
+ "Sets the default number of partitions to create
when repartitioning query stages".to_string(),
+ DataType::UInt16, Some("2".to_string())),
ConfigEntry::new(BALLISTA_DEFAULT_BATCH_SIZE.to_string(),
"Sets the default batch size".to_string(),
DataType::UInt16, Some("8192".to_string())),
@@ -166,19 +166,19 @@ impl BallistaConfig {
DataType::Boolean, Some("true".to_string())),
ConfigEntry::new(BALLISTA_REPARTITION_AGGREGATIONS.to_string(),
"Configuration for repartition
aggregations".to_string(),
- DataType::Boolean,Some("true".to_string())),
+ DataType::Boolean, Some("true".to_string())),
ConfigEntry::new(BALLISTA_REPARTITION_WINDOWS.to_string(),
"Configuration for repartition
windows".to_string(),
- DataType::Boolean,Some("true".to_string())),
+ DataType::Boolean, Some("true".to_string())),
ConfigEntry::new(BALLISTA_PARQUET_PRUNING.to_string(),
"Configuration for parquet prune".to_string(),
- DataType::Boolean,Some("true".to_string())),
+ DataType::Boolean, Some("true".to_string())),
ConfigEntry::new(BALLISTA_WITH_INFORMATION_SCHEMA.to_string(),
- "Sets whether enable information_schema".to_string(),
- DataType::Boolean,Some("false".to_string())),
+ "Sets whether enable
information_schema".to_string(),
+ DataType::Boolean, Some("false".to_string())),
ConfigEntry::new(BALLISTA_PLUGIN_DIR.to_string(),
"Sets the plugin dir".to_string(),
- DataType::Utf8,Some("".to_string())),
+ DataType::Utf8, Some("".to_string())),
];
entries
.iter()
diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml
b/ballista/rust/scheduler/scheduler_config_spec.toml
index cca96edf..d06b523b 100644
--- a/ballista/rust/scheduler/scheduler_config_spec.toml
+++ b/ballista/rust/scheduler/scheduler_config_spec.toml
@@ -70,4 +70,10 @@ default =
"ballista_core::config::TaskSchedulingPolicy::PullStaged"
name = "plugin_dir"
type = "String"
doc = "plugin dir"
+default = "std::string::String::from(\"\")"
+
+[[param]]
+name = "sled_dir"
+type = "String"
+doc = "Sled dir: Opens a Db for saving schduler metadata at the specified
path. This will create a new storage directory at the specified path if it does
not already exist."
default = "std::string::String::from(\"\")"
\ No newline at end of file
diff --git a/ballista/rust/scheduler/src/main.rs
b/ballista/rust/scheduler/src/main.rs
index f9f3b6ad..39e893b9 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -178,11 +178,18 @@ async fn main() -> Result<()> {
}
#[cfg(feature = "sled")]
StateBackend::Standalone => {
- // TODO: Use a real file and make path is configurable
- Arc::new(
- StandaloneClient::try_new_temporary()
- .context("Could not create standalone config backend")?,
- )
+ if opt.sled_dir.is_empty() {
+ Arc::new(
+ StandaloneClient::try_new_temporary()
+ .context("Could not create standalone config
backend")?,
+ )
+ } else {
+ println!("{}", opt.sled_dir);
+ Arc::new(
+ StandaloneClient::try_new(opt.sled_dir)
+ .context("Could not create standalone config
backend")?,
+ )
+ }
}
#[cfg(not(feature = "sled"))]
StateBackend::Standalone => {