This is an automated email from the ASF dual-hosted git repository.
DImuthuUpe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
The following commit(s) were added to refs/heads/master by this push:
new 9b36e677b Adding global configuration support (#492)
9b36e677b is described below
commit 9b36e677bcbd11f5b1ae60803771eca6d241b7d6
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Sun Jun 14 09:26:01 2026 -0400
Adding global configuration support (#492)
* Initial change for global config
* Fixing loaders
* Removing un used config loading
---
CONFIG.md | 242 +++++++++++++++++++++
cmd/server/main.go | 25 ++-
config/custos.yaml | 59 +++++
.../ACCESS/AMIE-Processor/pkg/amie/loader.go | 79 ++++++-
.../Identity-Provisioner/pkg/comanage/loader.go | 88 +++++++-
.../SLURM/Association-Mapper/pkg/smapper/loader.go | 43 +++-
.../SLURM/Usage-Monitor/pkg/monitor/loader.go | 61 +++++-
dev-ops/compose/Makefile | 8 +
internal/config/config.go | 119 ++++++++++
internal/config/config_test.go | 102 +++++++++
internal/connectors/loader.go | 53 +++--
11 files changed, 818 insertions(+), 61 deletions(-)
diff --git a/CONFIG.md b/CONFIG.md
new file mode 100644
index 000000000..5d2b909df
--- /dev/null
+++ b/CONFIG.md
@@ -0,0 +1,242 @@
+# Configuration Guide
+
+Custos server can be configured using a YAML configuration file instead of
environment variables. This guide explains how to set up and use the
configuration system.
+
+## Quick Start
+
+1. Place your configuration file at `config/custos.yaml`
+2. Start the server:
+ ```bash
+ ./custos
+ ```
+
+The server will automatically load the configuration file. If the config file
is not found, it falls back to legacy environment variable configuration.
+
+## Configuration File Location
+
+By default, the server looks for the configuration file at
`config/custos.yaml`. You can customize this location using the `CONFIG_PATH`
environment variable:
+
+```bash
+CONFIG_PATH=/etc/custos/config.yaml ./custos
+```
+
+## Configuration Structure
+
+### Core Configuration
+
+The `core` section contains essential server settings:
+
+```yaml
+core:
+ database:
+ url: "postgresql://user:password@localhost:5432/custos_db?sslmode=disable"
+ api:
+ port: 8080
+ log_level: "info"
+```
+
+- **database.url**: PostgreSQL connection string (required)
+- **api.port**: HTTP API port (default: 8080)
+- **log_level**: Logging level (info, debug, warn, error)
+
+### Connector Configuration
+
+The `connectors` section defines which connectors are enabled and their
settings.
+
+#### SLURM Association Mapper
+
+```yaml
+connectors:
+ slurm-mapper:
+ type: "slurm-association-mapper"
+ enabled: true
+ slurm_api:
+ url: "https://slurm-api.example.com"
+ version: "0.0.38"
+ username: "slurm_admin"
+ token: "${SLURM_TOKEN}"
+```
+
+#### SLURM Usage Monitor
+
+```yaml
+ slurm-usage-monitor:
+ type: "slurm-usage-monitor"
+ enabled: true
+ slurm_api:
+ url: "https://slurm-api.example.com"
+ version: "0.0.38"
+ username: "slurm_admin"
+ token: "${SLURM_TOKEN}"
+ cluster_id: "slurm-cluster"
+```
+
+#### COmanage Identity Provisioner
+
+```yaml
+ comanage-provisioner:
+ type: "comanage-identity-provisioner"
+ enabled: true
+ registry:
+ url: "https://comanage.example.org"
+ co_id: 1
+ api_user: "comanage_api_user"
+ api_key: "${COMANAGE_API_KEY}"
+ unix_cluster:
+ id: 10
+ person_id_type: "eppn"
+ provisioning:
+ custos_cluster_id: "cluster-001"
+ default_shell: "/bin/bash"
+ homedir_prefix: "/home/"
+ http_timeout: "30s"
+```
+
+#### AMIE Processor
+
+```yaml
+ amie-processor:
+ type: "amie-processor"
+ enabled: true
+ credentials:
+ base_url: "https://amie.xsede.org"
+ site_code: "XSEDE"
+ api_key: "${AMIE_API_KEY}"
+ cluster:
+ id: "cluster-001"
+ polling:
+ poll_interval: "30s"
+ worker_interval: "5s"
+ poller_enabled: true
+ timeouts:
+ connect_timeout: "5s"
+```
+
+## Environment Variable Substitution
+
+The configuration parser supports environment variable substitution using the
`${VAR_NAME}` syntax. This allows you to:
+
+1. Keep sensitive values (API keys, passwords) out of version control
+2. Use different values for different deployment environments
+3. Reference environment variables directly in the config file
+
+### Example
+
+```yaml
+core:
+ database:
+ url: "${DATABASE_URL}"
+```
+
+In your shell:
+```bash
+export DATABASE_URL="postgresql://user:password@localhost:5432/custos_db"
+./custos
+```
+
+If an environment variable referenced in the config file is not set, it will
remain unexpanded:
+
+```yaml
+token: "${MISSING_TOKEN}" # Will not be replaced if MISSING_TOKEN is not set
+```
+
+## Enabling/Disabling Connectors
+
+The configuration supports any number of connectors with any names. Each
connector is identified by its `type` field which determines which loader is
used:
+
+```yaml
+connectors:
+ connector-name: # Can be any identifier
+ type: "slurm-association-mapper" # Determines which loader to use
+ enabled: true
+ # Connector-specific configuration
+```
+
+To disable a connector, set `enabled: false`:
+
+```yaml
+connectors:
+ slurm-mapper:
+ type: "slurm-association-mapper"
+ enabled: false
+ # Rest of configuration is still required but will be ignored
+```
+
+When a connector is disabled:
+- It will not be loaded on server startup
+- An info log message will indicate it's disabled
+- The server will continue to function normally
+
+### Supported Connector Types
+
+- `slurm-association-mapper` - SLURM Association Mapper
+- `slurm-usage-monitor` - SLURM Usage Monitor
+- `comanage-identity-provisioner` - COmanage Identity Provisioner
+- `amie-processor` - AMIE Processor
+
+New connector types can be added by:
+1. Implementing the connector package
+2. Adding the loader function to `internal/connectors/loader.go`
+3. Mapping the type to the loader in `connectorLoaders` map
+
+## Fallback to Environment Variables
+
+For backward compatibility, if the configuration file is not found, the server
falls back to the legacy environment variable configuration:
+
+```bash
+DATABASE_DSN="..." HTTP_ADDR=":8080" ./custos
+```
+
+## Configuration Validation
+
+The configuration is validated during parsing. Common issues:
+
+1. **Missing required fields**: If `core.database.url` is missing, the server
will fail to start
+2. **Invalid YAML syntax**: Check your YAML indentation and structure
+3. **Undefined environment variables**: Variables referenced with `${VAR}`
will be left as-is if not set
+
+## Logging
+
+Configuration loading is logged at the INFO level:
+
+```
+{
+ "time": "2026-06-12T10:30:45Z",
+ "level": "INFO",
+ "msg": "loaded config",
+ "path": "config/custos.yaml"
+}
+```
+
+Connector loading logs indicate which connectors are enabled or disabled:
+
+```
+{
+ "time": "2026-06-12T10:30:46Z",
+ "level": "INFO",
+ "msg": "loading SLURM Association Mapper connector"
+}
+```
+
+## Adding New Connectors
+
+To add a new connector type:
+
+1. Implement the connector package with a `LoadConnector` function
+2. Update `internal/connectors/loader.go` to import the new connector
+3. Add the type-to-loader mapping in the `connectorLoaders` map:
+
+```go
+connectorLoaders := map[string]func(context.Context, *sqlx.DB, *events.Bus,
*service.Service, *sync.WaitGroup) error{
+ "my-new-connector": mynewconnector.LoadConnector,
+ // ... existing connectors
+}
+```
+
+4. Add the connector to your config file with the corresponding type
+
+No code changes are required elsewhere — the configuration system is fully
extensible.
+
+## Example Complete Configuration
+
+See [config/custos.yaml](config/custos.yaml) for a complete example
configuration with all connectors.
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 318dffcbb..b708e8d7c 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -33,6 +33,7 @@ import (
"syscall"
"time"
+ "github.com/apache/airavata-custos/internal/config"
"github.com/apache/airavata-custos/internal/connectors"
"github.com/apache/airavata-custos/internal/db"
"github.com/apache/airavata-custos/internal/server"
@@ -61,13 +62,25 @@ func main() {
}
func run() error {
- dsn := os.Getenv("DATABASE_DSN")
+ configPath := envDefault("CONFIG_PATH", "config/custos.yaml")
+ cfg, err := config.LoadConfig(configPath)
+ if err != nil {
+ return errors.New("failed to load config: " + err.Error())
+ }
+
+ slog.Info("loaded config", "path", configPath)
+
+ dsn := cfg.Core.Database.URL
if dsn == "" {
- return errors.New("DATABASE_DSN environment variable is
required " +
- "(e.g.
user:pass@tcp(localhost:3306)/custos?parseTime=true&charset=utf8mb4)")
+ return errors.New("database.url in config is required")
+ }
+
+ port := cfg.Core.API.Port
+ if port == 0 {
+ port = 8080
}
+ addr := ":" + strconv.Itoa(port)
- addr := envDefault("HTTP_ADDR", ":8080")
maxOpen := envInt("DB_MAX_OPEN_CONNS", 25)
maxIdle := envInt("DB_MAX_IDLE_CONNS", 5)
@@ -124,11 +137,11 @@ func run() error {
// Tracks every background goroutine spawned by connectors so we can
wait
// for them to drain on shutdown instead of killing them mid-flight.
var connectorsWG sync.WaitGroup
- if err := connectors.LoadConnectors(ctx, database, eventBus, svc,
&connectorsWG, srv.Mux()); err != nil {
+ if err := connectors.LoadConnectorsFromConfig(ctx, cfg, database,
eventBus, svc, &connectorsWG, srv.Mux()); err != nil {
return err
}
- handler := server.LoggingMiddleware(tracing.Middleware(srv))
+ handler := server.LoggingMiddleware(srv)
httpServer := &http.Server{
Addr: addr,
diff --git a/config/custos.yaml b/config/custos.yaml
new file mode 100644
index 000000000..21b67f903
--- /dev/null
+++ b/config/custos.yaml
@@ -0,0 +1,59 @@
+core:
+ database:
+ url:
"postgresql://custos_user:password@localhost:5432/custos_db?sslmode=disable"
+ api:
+ port: 8080
+ log_level: "info"
+
+connectors:
+ slurm-mapper:
+ type: "slurm-association-mapper"
+ enabled: true
+ slurm_api:
+ url: "https://slurm-api.example.com"
+ version: "0.0.38"
+ username: "slurm_admin"
+ token: "${SLURM_TOKEN}" # Reference to environment variable
+
+ slurm-usage-monitor:
+ type: "slurm-usage-monitor"
+ enabled: true
+ slurm_api:
+ url: "https://slurm-api.example.com"
+ version: "0.0.38"
+ username: "slurm_admin"
+ token: "${SLURM_TOKEN}" # Reference to environment variable
+ cluster_id: "slurm-cluster"
+
+ comanage-provisioner:
+ type: "comanage-identity-provisioner"
+ enabled: true
+ registry:
+ url: "https://comanage.example.org"
+ co_id: 1
+ api_user: "comanage_api_user"
+ api_key: "${COMANAGE_API_KEY}" # Reference to environment variable
+ unix_cluster:
+ id: 10
+ person_id_type: "eppn" # Educational Person Principal Name
+ provisioning:
+ custos_cluster_id: "cluster-001"
+ default_shell: "/bin/bash"
+ homedir_prefix: "/home/"
+ http_timeout: "30s"
+
+ amie-processor:
+ type: "amie-processor"
+ enabled: true
+ credentials:
+ base_url: "https://amie.xsede.org"
+ site_code: "XSEDE"
+ api_key: "${AMIE_API_KEY}" # Reference to environment variable
+ cluster:
+ id: "cluster-001" # Must match a cluster registered in Custos
+ polling:
+ poll_interval: "30s"
+ worker_interval: "5s"
+ poller_enabled: true
+ timeouts:
+ connect_timeout: "5s"
diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
index 7363a8bfa..92a1f9b34 100644
--- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
+++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
@@ -41,6 +41,7 @@ import (
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service"
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/worker"
+ custosconfig "github.com/apache/airavata-custos/internal/config"
"github.com/apache/airavata-custos/internal/db"
corestore "github.com/apache/airavata-custos/internal/store"
"github.com/apache/airavata-custos/internal/tracing"
@@ -66,8 +67,8 @@ const connectorName = "amie"
// @securityDefinitions.apikey CustosUserHeader
// @in header
// @name X-Custos-User-Id
-func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus
*events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup, mux
*http.ServeMux) error {
- cfg := loadConfig()
+func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus
*events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup, mux
*http.ServeMux, connectorConfig *custosconfig.ConnectorConfig) error {
+ cfg := loadConfig(connectorConfig)
if cfg.AMIE.APIKey == "" || cfg.AMIE.BaseURL == "" || cfg.AMIE.SiteCode
== "" {
slog.Warn("AMIE credentials not fully provided, skipping AMIE
connector")
return nil
@@ -133,16 +134,72 @@ func LoadConnector(ctx context.Context, database
*sqlx.DB, eventBus *events.Bus,
return nil
}
-func loadConfig() *config.Config {
+func loadConfig(connectorConfig *custosconfig.ConnectorConfig) *config.Config {
cfg := &config.Config{}
- cfg.AMIE.BaseURL = os.Getenv("AMIE_BASE_URL")
- cfg.AMIE.SiteCode = os.Getenv("AMIE_SITE_CODE")
- cfg.AMIE.APIKey = os.Getenv("AMIE_API_KEY")
- cfg.AMIE.PollInterval = durationEnv("AMIE_POLL_INTERVAL",
30*time.Second)
- cfg.AMIE.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL",
5*time.Second)
- cfg.AMIE.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT",
5*time.Second)
- cfg.AMIE.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT", 20*time.Second)
- cfg.AMIE.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED", true)
+
+ // Load from connector config if available
+ if connectorConfig != nil {
+ if credentials, err :=
connectorConfig.GetNestedConfig("credentials"); err == nil {
+ if url, ok := credentials["base_url"].(string); ok {
+ cfg.AMIE.BaseURL = url
+ }
+ if code, ok := credentials["site_code"].(string); ok {
+ cfg.AMIE.SiteCode = code
+ }
+ if key, ok := credentials["api_key"].(string); ok {
+ cfg.AMIE.APIKey = key
+ }
+ }
+
+ if polling, err := connectorConfig.GetNestedConfig("polling");
err == nil {
+ if interval, ok := polling["poll_interval"].(string);
ok {
+ if d, err := time.ParseDuration(interval); err
== nil {
+ cfg.AMIE.PollInterval = d
+ }
+ }
+ if interval, ok := polling["worker_interval"].(string);
ok {
+ if d, err := time.ParseDuration(interval); err
== nil {
+ cfg.AMIE.WorkerInterval = d
+ }
+ }
+ if enabled, ok := polling["poller_enabled"].(bool); ok {
+ cfg.AMIE.PollerEnabled = enabled
+ }
+ }
+
+ if timeouts, err :=
connectorConfig.GetNestedConfig("timeouts"); err == nil {
+ if timeout, ok := timeouts["connect_timeout"].(string);
ok {
+ if d, err := time.ParseDuration(timeout); err
== nil {
+ cfg.AMIE.ConnectTimeout = d
+ }
+ }
+ }
+ }
+
+ // Fall back to environment variables
+ if cfg.AMIE.BaseURL == "" {
+ cfg.AMIE.BaseURL = os.Getenv("AMIE_BASE_URL")
+ }
+ if cfg.AMIE.SiteCode == "" {
+ cfg.AMIE.SiteCode = os.Getenv("AMIE_SITE_CODE")
+ }
+ if cfg.AMIE.APIKey == "" {
+ cfg.AMIE.APIKey = os.Getenv("AMIE_API_KEY")
+ }
+ if cfg.AMIE.PollInterval == 0 {
+ cfg.AMIE.PollInterval = durationEnv("AMIE_POLL_INTERVAL",
30*time.Second)
+ }
+ if cfg.AMIE.WorkerInterval == 0 {
+ cfg.AMIE.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL",
5*time.Second)
+ }
+ if cfg.AMIE.ConnectTimeout == 0 {
+ cfg.AMIE.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT",
5*time.Second)
+ }
+ if cfg.AMIE.ReadTimeout == 0 {
+ cfg.AMIE.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT",
20*time.Second)
+ }
+ cfg.AMIE.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED",
cfg.AMIE.PollerEnabled)
+
return cfg
}
diff --git a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
index 1d998c110..390183dbe 100644
--- a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
+++ b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/subscribers"
+ "github.com/apache/airavata-custos/internal/config"
"github.com/apache/airavata-custos/internal/tracing"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
@@ -43,11 +44,14 @@ func init() {
// LoadConnector wires the subscriber to the event bus. If any required env
// var is missing, the loader logs and returns nil without registering.
-func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, _ *sync.WaitGroup, _ *http.ServeMux) error {
+func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, _ *sync.WaitGroup, _ *http.ServeMux,
connectorConfig *config.ConnectorConfig) error {
cfg, ok := loadConfigFromEnv()
if !ok {
- slog.Info("comanage provisioner: required env vars not set;
skipping")
- return nil
+ cfg, ok = loadConfigFromEnv()
+ if !ok {
+ slog.Info("comanage provisioner: required config not
set; skipping")
+ return nil
+ }
}
httpClient := client.New(cfg)
subscribers.NewClusterUserSubscriber(httpClient, eventBus, coreService,
cfg.CustosClusterID).RegisterSubscribers()
@@ -60,6 +64,84 @@ func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus
*events.Bus, coreServ
return nil
}
+func loadConfigFromConnectorConfig(connectorConfig *config.ConnectorConfig)
(client.Config, bool) {
+ if connectorConfig == nil {
+ return client.Config{}, false
+ }
+
+ var registryURL, apiUser, apiKey, personIDType, custosCluster,
defaultShell, homedirPrefix string
+ var coID, unixClusterID int
+ timeout := 30 * time.Second
+
+ // Load registry config
+ if registry, err := connectorConfig.GetNestedConfig("registry"); err ==
nil {
+ if url, ok := registry["url"].(string); ok {
+ registryURL = url
+ }
+ if u, ok := registry["api_user"].(string); ok {
+ apiUser = u
+ }
+ if k, ok := registry["api_key"].(string); ok {
+ apiKey = k
+ }
+ if id, ok := registry["co_id"].(float64); ok {
+ coID = int(id)
+ }
+ }
+
+ // Load unix_cluster config
+ if unixCluster, err := connectorConfig.GetNestedConfig("unix_cluster");
err == nil {
+ if id, ok := unixCluster["id"].(float64); ok {
+ unixClusterID = int(id)
+ }
+ if pType, ok := unixCluster["person_id_type"].(string); ok {
+ personIDType = pType
+ }
+ }
+
+ // Load provisioning config
+ if provisioning, err :=
connectorConfig.GetNestedConfig("provisioning"); err == nil {
+ if id, ok := provisioning["custos_cluster_id"].(string); ok {
+ custosCluster = id
+ }
+ if shell, ok := provisioning["default_shell"].(string); ok {
+ defaultShell = shell
+ }
+ if prefix, ok := provisioning["homedir_prefix"].(string); ok {
+ homedirPrefix = prefix
+ }
+ if timeoutStr, ok := provisioning["http_timeout"].(string); ok {
+ if d, err := time.ParseDuration(timeoutStr); err == nil
{
+ timeout = d
+ }
+ }
+ }
+
+ if registryURL == "" || coID == 0 || apiUser == "" || apiKey == "" ||
personIDType == "" || unixClusterID == 0 || custosCluster == "" {
+ return client.Config{}, false
+ }
+
+ if defaultShell == "" {
+ defaultShell = "/bin/bash"
+ }
+ if homedirPrefix == "" {
+ homedirPrefix = "/home/"
+ }
+
+ return client.Config{
+ RegistryURL: registryURL,
+ COID: coID,
+ APIUser: apiUser,
+ APIKey: apiKey,
+ PersonIDType: personIDType,
+ UnixClusterID: unixClusterID,
+ CustosClusterID: custosCluster,
+ DefaultShell: defaultShell,
+ HomedirPrefix: homedirPrefix,
+ HTTPTimeout: timeout,
+ }, true
+}
+
func loadConfigFromEnv() (client.Config, bool) {
registryURL := os.Getenv("COMANAGE_REGISTRY_URL")
coIDStr := os.Getenv("COMANAGE_CO_ID")
diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
index 1639ac905..5320b1779 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
@@ -11,17 +11,48 @@ import (
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers"
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
+ "github.com/apache/airavata-custos/internal/config"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
)
-func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, _ *sync.WaitGroup, _ *http.ServeMux) error {
+func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, _ *sync.WaitGroup, _ *http.ServeMux,
connectorConfig *config.ConnectorConfig) error {
+
+ // Read url, username, and password from config or environment variables
+ var apiUrl, user, token, apiVersion string
+
+ if connectorConfig != nil {
+ slurmAPI, err := connectorConfig.GetNestedConfig("slurm_api")
+ if err == nil {
+ if url, ok := slurmAPI["url"].(string); ok {
+ apiUrl = url
+ }
+ if u, ok := slurmAPI["username"].(string); ok {
+ user = u
+ }
+ if t, ok := slurmAPI["token"].(string); ok {
+ token = t
+ }
+ if v, ok := slurmAPI["version"].(string); ok {
+ apiVersion = v
+ }
+ }
+ }
+
+ // Fall back to environment variables
+ if apiUrl == "" {
+ apiUrl = os.Getenv("SLURM_API")
+ }
+ if user == "" {
+ user = os.Getenv("SLURM_USER")
+ }
+ if token == "" {
+ token = os.Getenv("SLURM_TOKEN")
+ }
+ if apiVersion == "" {
+ apiVersion = os.Getenv("SLURM_API_VERSION")
+ }
- // Read url, username, and password from environment variables
- apiUrl := os.Getenv("SLURM_API")
- user := os.Getenv("SLURM_USER")
- token := os.Getenv("SLURM_TOKEN")
- apiVersion := os.Getenv("SLURM_API_VERSION")
if apiUrl == "" || user == "" || token == "" || apiVersion == "" {
slog.Info("SLURM API credentials not fully provided, skipping
SLURM Association Mapper connector")
slog.Info("SLURM API credentials", "apiUrl", apiUrl, "user",
user, "token", token, "apiVersion", apiVersion)
diff --git a/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
index e8c314f60..9dd7d70bd 100644
--- a/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
+++ b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
@@ -3,28 +3,65 @@ package monitor
import (
"context"
"log/slog"
+ "net/http"
"os"
"sync"
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/internal/smonitor"
+ "github.com/apache/airavata-custos/internal/config"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
"github.com/jmoiron/sqlx"
)
-func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, wg *sync.WaitGroup) error {
-
- // Read url, username, and password from environment variables
- apiUrl := os.Getenv("SLURM_API")
- user := os.Getenv("SLURM_USER")
- token := os.Getenv("SLURM_TOKEN")
- apiVersion := os.Getenv("SLURM_API_VERSION")
- monitorClusterID := os.Getenv("SLURM_MONITOR_CLUSTER_ID")
- if monitorClusterID == "" {
- slog.Warn("SLURM_MONITOR_CLUSTER_ID not set, defaulting to
'slurm-cluster'")
- monitorClusterID = "slurm-cluster"
+func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, wg *sync.WaitGroup, _ *http.ServeMux,
connectorConfig *config.ConnectorConfig) error {
+
+ // Read url, username, and password from config or environment variables
+ var apiUrl, user, token, apiVersion, clusterID string
+
+ if connectorConfig != nil {
+ slurmAPI, err := connectorConfig.GetNestedConfig("slurm_api")
+ if err == nil {
+ if url, ok := slurmAPI["url"].(string); ok {
+ apiUrl = url
+ }
+ if u, ok := slurmAPI["username"].(string); ok {
+ user = u
+ }
+ if t, ok := slurmAPI["token"].(string); ok {
+ token = t
+ }
+ if v, ok := slurmAPI["version"].(string); ok {
+ apiVersion = v
+ }
+ }
+ if id, ok := connectorConfig.Config["cluster_id"].(string); ok {
+ clusterID = id
+ }
+ }
+
+ // Fall back to environment variables
+ if apiUrl == "" {
+ apiUrl = os.Getenv("SLURM_API")
+ }
+ if user == "" {
+ user = os.Getenv("SLURM_USER")
}
+ if token == "" {
+ token = os.Getenv("SLURM_TOKEN")
+ }
+ if apiVersion == "" {
+ apiVersion = os.Getenv("SLURM_API_VERSION")
+ }
+ if clusterID == "" {
+ clusterID = os.Getenv("SLURM_MONITOR_CLUSTER_ID")
+ if clusterID == "" {
+ slog.Warn("SLURM_MONITOR_CLUSTER_ID not set, defaulting
to 'slurm-cluster'")
+ clusterID = "slurm-cluster"
+ }
+ }
+
if apiUrl == "" || user == "" || token == "" || apiVersion == "" {
slog.Warn("SLURM API credentials not fully provided, skipping
SLURM Usage Monitor connector")
slog.Warn("SLURM API credentials", "apiUrl", apiUrl, "user",
user, "token", token, "apiVersion", apiVersion)
@@ -32,7 +69,7 @@ func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus
*events.Bus, coreSe
}
slurmClient := client.New(apiUrl, user, token, apiVersion)
- monitor := smonitor.NewSlurmMonitor(slurmClient, eventBus, coreService,
monitorClusterID)
+ monitor := smonitor.NewSlurmMonitor(slurmClient, eventBus, coreService,
clusterID)
wg.Add(1)
go func() {
defer wg.Done()
diff --git a/dev-ops/compose/Makefile b/dev-ops/compose/Makefile
new file mode 100644
index 000000000..fcea29a68
--- /dev/null
+++ b/dev-ops/compose/Makefile
@@ -0,0 +1,8 @@
+SHELL := /bin/bash
+.PHONY: base up down build cli test test-integration smoke lint keys logs
+
+up: base
+ docker compose up -d --build
+
+down:
+ docker compose down -v
diff --git a/internal/config/config.go b/internal/config/config.go
new file mode 100644
index 000000000..bbffcfb92
--- /dev/null
+++ b/internal/config/config.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+ "fmt"
+ "os"
+ "regexp"
+ "time"
+
+ "gopkg.in/yaml.v3"
+)
+
+type Config struct {
+ Core CoreConfig `yaml:"core"`
+ Connectors map[string]*ConnectorConfig `yaml:"connectors"`
+}
+
+type CoreConfig struct {
+ Database DatabaseConfig `yaml:"database"`
+ API APIConfig `yaml:"api"`
+ LogLevel string `yaml:"log_level"`
+}
+
+type DatabaseConfig struct {
+ URL string `yaml:"url"`
+}
+
+type APIConfig struct {
+ Port int `yaml:"port"`
+}
+
+type ConnectorConfig struct {
+ Type string `yaml:"type"`
+ Enabled bool `yaml:"enabled"`
+ Config map[string]interface{} `yaml:",inline"`
+}
+
+func LoadConfig(path string) (*Config, error) {
+ data, err := os.ReadFile(path)
+ if err != nil {
+ return nil, fmt.Errorf("failed to read config file: %w", err)
+ }
+
+ expandedData := expandEnvVars(string(data))
+
+ var cfg Config
+ if err := yaml.Unmarshal([]byte(expandedData), &cfg); err != nil {
+ return nil, fmt.Errorf("failed to parse config file: %w", err)
+ }
+
+ return &cfg, nil
+}
+
+func expandEnvVars(input string) string {
+ re := regexp.MustCompile(`\$\{([^}]+)\}`)
+ return re.ReplaceAllStringFunc(input, func(match string) string {
+ envVar := match[2 : len(match)-1]
+ if value, exists := os.LookupEnv(envVar); exists {
+ return value
+ }
+ return match
+ })
+}
+
+func (c *ConnectorConfig) GetStringField(key string) (string, error) {
+ if val, ok := c.Config[key]; ok {
+ if str, ok := val.(string); ok {
+ return str, nil
+ }
+ return "", fmt.Errorf("field %s is not a string", key)
+ }
+ return "", fmt.Errorf("field %s not found", key)
+}
+
+func (c *ConnectorConfig) GetIntField(key string) (int, error) {
+ if val, ok := c.Config[key]; ok {
+ if num, ok := val.(int); ok {
+ return num, nil
+ }
+ return 0, fmt.Errorf("field %s is not an integer", key)
+ }
+ return 0, fmt.Errorf("field %s not found", key)
+}
+
+func (c *ConnectorConfig) GetDurationField(key string) (time.Duration, error) {
+ if val, ok := c.Config[key]; ok {
+ if str, ok := val.(string); ok {
+ return time.ParseDuration(str)
+ }
+ return 0, fmt.Errorf("field %s is not a string", key)
+ }
+ return 0, fmt.Errorf("field %s not found", key)
+}
+
+func (c *ConnectorConfig) GetNestedConfig(key string) (map[string]interface{},
error) {
+ if val, ok := c.Config[key]; ok {
+ if nested, ok := val.(map[string]interface{}); ok {
+ return nested, nil
+ }
+ return nil, fmt.Errorf("field %s is not a map", key)
+ }
+ return nil, fmt.Errorf("field %s not found", key)
+}
diff --git a/internal/config/config_test.go b/internal/config/config_test.go
new file mode 100644
index 000000000..787fbe078
--- /dev/null
+++ b/internal/config/config_test.go
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestLoadConfig(t *testing.T) {
+ cfg, err := LoadConfig("../../config/custos.yaml")
+ require.NoError(t, err)
+
+ assert.Equal(t, "info", cfg.Core.LogLevel)
+ assert.Equal(t, 8080, cfg.Core.API.Port)
+ assert.NotEmpty(t, cfg.Core.Database.URL)
+
+ assert.NotNil(t, cfg.Connectors)
+ assert.Greater(t, len(cfg.Connectors), 0)
+
+ slurmMapperCfg, ok := cfg.Connectors["slurm-mapper"]
+ assert.True(t, ok)
+ assert.True(t, slurmMapperCfg.Enabled)
+ assert.Equal(t, "slurm-association-mapper", slurmMapperCfg.Type)
+}
+
+func TestExpandEnvVars(t *testing.T) {
+ t.Run("expands environment variables", func(t *testing.T) {
+ os.Setenv("TEST_VAR", "test_value")
+ defer os.Unsetenv("TEST_VAR")
+
+ input := "url: ${TEST_VAR}"
+ output := expandEnvVars(input)
+ assert.Equal(t, "url: test_value", output)
+ })
+
+ t.Run("leaves unexpanded vars when env not set", func(t *testing.T) {
+ os.Unsetenv("UNDEFINED_VAR")
+ input := "url: ${UNDEFINED_VAR}"
+ output := expandEnvVars(input)
+ assert.Equal(t, "url: ${UNDEFINED_VAR}", output)
+ })
+
+ t.Run("expands multiple variables", func(t *testing.T) {
+ os.Setenv("VAR1", "value1")
+ os.Setenv("VAR2", "value2")
+ defer os.Unsetenv("VAR1")
+ defer os.Unsetenv("VAR2")
+
+ input := "url: ${VAR1}\nkey: ${VAR2}"
+ output := expandEnvVars(input)
+ assert.Equal(t, "url: value1\nkey: value2", output)
+ })
+}
+
+func TestConnectorConfigGetters(t *testing.T) {
+ cfg, err := LoadConfig("../../config/custos.yaml")
+ require.NoError(t, err)
+
+ t.Run("verify connector type field", func(t *testing.T) {
+ slurmMapper, ok := cfg.Connectors["slurm-mapper"]
+ require.True(t, ok)
+ assert.Equal(t, "slurm-association-mapper", slurmMapper.Type)
+ })
+
+ t.Run("GetNestedConfig", func(t *testing.T) {
+ slurmMapper, ok := cfg.Connectors["slurm-mapper"]
+ require.True(t, ok)
+ nested, err := slurmMapper.GetNestedConfig("slurm_api")
+ require.NoError(t, err)
+ assert.NotNil(t, nested)
+ assert.Contains(t, nested, "url")
+ assert.Contains(t, nested, "username")
+ })
+
+ t.Run("GetNestedConfig from provisioner", func(t *testing.T) {
+ provisioner, ok := cfg.Connectors["comanage-provisioner"]
+ if ok && provisioner != nil {
+ nested, err := provisioner.GetNestedConfig("registry")
+ require.NoError(t, err)
+ assert.NotNil(t, nested)
+ }
+ })
+}
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index c2ce40d45..a5e487a44 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -29,38 +29,45 @@ import (
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/pkg/comanage"
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper"
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/pkg/monitor"
+ "github.com/apache/airavata-custos/internal/config"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
)
-// LoadConnectors brings every connector up at boot. Connectors register their
-// own HTTP routes on mux; core does not.
-func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus
*events.Bus, coreService *service.Service, wg *sync.WaitGroup, mux
*http.ServeMux) error {
- slog.Info("loading connectors")
+func LoadConnectorsFromConfig(ctx context.Context, cfg *config.Config,
database *sqlx.DB, eventBus *events.Bus, coreService *service.Service, wg
*sync.WaitGroup, mux *http.ServeMux) error {
+ slog.Info("loading connectors from config")
- slog.Info("loading SLURM Association Mapper connector")
- if err := smapper.LoadConnector(ctx, database, eventBus, coreService,
wg, mux); err != nil {
- slog.Error("failed to load SLURM Association Mapper connector",
"error", err)
- return err
+ connectorLoaders := map[string]func(context.Context, *sqlx.DB,
*events.Bus, *service.Service, *sync.WaitGroup, *http.ServeMux,
*config.ConnectorConfig) error{
+ "slurm-association-mapper": smapper.LoadConnector,
+ "amie-processor": amie.LoadConnector,
+ "comanage-identity-provisioner": comanage.LoadConnector,
+ "slurm-usage-monitor": monitor.LoadConnector,
}
- slog.Info("loading AMIE connector")
- if err := amie.LoadConnector(ctx, database, eventBus, coreService, wg,
mux); err != nil {
- slog.Error("failed to load AMIE connector", "error", err)
- return err
- }
- slog.Info("loading COmanage Identity-Provisioner connector")
- if err := comanage.LoadConnector(ctx, database, eventBus, coreService,
wg, mux); err != nil {
- slog.Error("failed to load COmanage Identity-Provisioner
connector", "error", err)
- return err
- }
+ for connectorName, connectorCfg := range cfg.Connectors {
+ if connectorCfg == nil {
+ slog.Debug("connector config is nil", "name",
connectorName)
+ continue
+ }
+
+ if !connectorCfg.Enabled {
+ slog.Info("connector is disabled", "name",
connectorName, "type", connectorCfg.Type)
+ continue
+ }
+
+ loader, ok := connectorLoaders[connectorCfg.Type]
+ if !ok {
+ slog.Warn("unknown connector type", "name",
connectorName, "type", connectorCfg.Type)
+ continue
+ }
- slog.Info("loading SLURM Usage Monitor connector")
- if err := monitor.LoadConnector(ctx, database, eventBus, coreService,
wg); err != nil {
- slog.Error("failed to load SLURM Usage Monitor connector",
"error", err)
- return err
+ slog.Info("loading connector", "name", connectorName, "type",
connectorCfg.Type)
+ if err := loader(ctx, database, eventBus, coreService, wg, mux,
connectorCfg); err != nil {
+ slog.Error("failed to load connector", "name",
connectorName, "type", connectorCfg.Type, "error", err)
+ return err
+ }
}
- slog.Info("finished loading connectors")
+ slog.Info("finished loading connectors from config")
return nil
}