This is an automated email from the ASF dual-hosted git repository. DImuthuUpe pushed a commit to branch global-config in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit 98100344ef4f6f5c440d97958ab30ab75f55fba3 Author: DImuthuUpe <[email protected]> AuthorDate: Fri Jun 12 18:34:18 2026 -0400 Initial change for global config --- CONFIG.md | 242 +++++++++++++++++++++ cmd/server/main.go | 109 ++++++++++ config/custos.yaml | 59 +++++ .../ACCESS/AMIE-Processor/pkg/amie/loader.go | 79 ++++++- .../Identity-Provisioner/pkg/comanage/loader.go | 90 +++++++- .../SLURM/Association-Mapper/pkg/smapper/loader.go | 43 +++- .../SLURM/Usage-Monitor/pkg/monitor/loader.go | 60 ++++- dev-ops/compose/Makefile | 8 + internal/config/config.go | 119 ++++++++++ internal/config/config_test.go | 102 +++++++++ internal/connectors/loader.go | 47 +++- 11 files changed, 921 insertions(+), 37 deletions(-) diff --git a/CONFIG.md b/CONFIG.md new file mode 100644 index 000000000..5d2b909df --- /dev/null +++ b/CONFIG.md @@ -0,0 +1,242 @@ +# Configuration Guide + +Custos server can be configured using a YAML configuration file instead of environment variables. This guide explains how to set up and use the configuration system. + +## Quick Start + +1. Place your configuration file at `config/custos.yaml` +2. Start the server: + ```bash + ./custos + ``` + +The server will automatically load the configuration file. If the config file is not found, it falls back to legacy environment variable configuration. + +## Configuration File Location + +By default, the server looks for the configuration file at `config/custos.yaml`. You can customize this location using the `CONFIG_PATH` environment variable: + +```bash +CONFIG_PATH=/etc/custos/config.yaml ./custos +``` + +## Configuration Structure + +### Core Configuration + +The `core` section contains essential server settings: + +```yaml +core: + database: + url: "postgresql://user:password@localhost:5432/custos_db?sslmode=disable" + api: + port: 8080 + log_level: "info" +``` + +- **database.url**: PostgreSQL connection string (required) +- **api.port**: HTTP API port (default: 8080) +- **log_level**: Logging level (info, debug, warn, error) + +### Connector Configuration + +The `connectors` section defines which connectors are enabled and their settings. + +#### SLURM Association Mapper + +```yaml +connectors: + slurm-mapper: + type: "slurm-association-mapper" + enabled: true + slurm_api: + url: "https://slurm-api.example.com" + version: "0.0.38" + username: "slurm_admin" + token: "${SLURM_TOKEN}" +``` + +#### SLURM Usage Monitor + +```yaml + slurm-usage-monitor: + type: "slurm-usage-monitor" + enabled: true + slurm_api: + url: "https://slurm-api.example.com" + version: "0.0.38" + username: "slurm_admin" + token: "${SLURM_TOKEN}" + cluster_id: "slurm-cluster" +``` + +#### COmanage Identity Provisioner + +```yaml + comanage-provisioner: + type: "comanage-identity-provisioner" + enabled: true + registry: + url: "https://comanage.example.org" + co_id: 1 + api_user: "comanage_api_user" + api_key: "${COMANAGE_API_KEY}" + unix_cluster: + id: 10 + person_id_type: "eppn" + provisioning: + custos_cluster_id: "cluster-001" + default_shell: "/bin/bash" + homedir_prefix: "/home/" + http_timeout: "30s" +``` + +#### AMIE Processor + +```yaml + amie-processor: + type: "amie-processor" + enabled: true + credentials: + base_url: "https://amie.xsede.org" + site_code: "XSEDE" + api_key: "${AMIE_API_KEY}" + cluster: + id: "cluster-001" + polling: + poll_interval: "30s" + worker_interval: "5s" + poller_enabled: true + timeouts: + connect_timeout: "5s" +``` + +## Environment Variable Substitution + +The configuration parser supports environment variable substitution using the `${VAR_NAME}` syntax. This allows you to: + +1. Keep sensitive values (API keys, passwords) out of version control +2. Use different values for different deployment environments +3. Reference environment variables directly in the config file + +### Example + +```yaml +core: + database: + url: "${DATABASE_URL}" +``` + +In your shell: +```bash +export DATABASE_URL="postgresql://user:password@localhost:5432/custos_db" +./custos +``` + +If an environment variable referenced in the config file is not set, it will remain unexpanded: + +```yaml +token: "${MISSING_TOKEN}" # Will not be replaced if MISSING_TOKEN is not set +``` + +## Enabling/Disabling Connectors + +The configuration supports any number of connectors with any names. Each connector is identified by its `type` field which determines which loader is used: + +```yaml +connectors: + connector-name: # Can be any identifier + type: "slurm-association-mapper" # Determines which loader to use + enabled: true + # Connector-specific configuration +``` + +To disable a connector, set `enabled: false`: + +```yaml +connectors: + slurm-mapper: + type: "slurm-association-mapper" + enabled: false + # Rest of configuration is still required but will be ignored +``` + +When a connector is disabled: +- It will not be loaded on server startup +- An info log message will indicate it's disabled +- The server will continue to function normally + +### Supported Connector Types + +- `slurm-association-mapper` - SLURM Association Mapper +- `slurm-usage-monitor` - SLURM Usage Monitor +- `comanage-identity-provisioner` - COmanage Identity Provisioner +- `amie-processor` - AMIE Processor + +New connector types can be added by: +1. Implementing the connector package +2. Adding the loader function to `internal/connectors/loader.go` +3. Mapping the type to the loader in `connectorLoaders` map + +## Fallback to Environment Variables + +For backward compatibility, if the configuration file is not found, the server falls back to the legacy environment variable configuration: + +```bash +DATABASE_DSN="..." HTTP_ADDR=":8080" ./custos +``` + +## Configuration Validation + +The configuration is validated during parsing. Common issues: + +1. **Missing required fields**: If `core.database.url` is missing, the server will fail to start +2. **Invalid YAML syntax**: Check your YAML indentation and structure +3. **Undefined environment variables**: Variables referenced with `${VAR}` will be left as-is if not set + +## Logging + +Configuration loading is logged at the INFO level: + +``` +{ + "time": "2026-06-12T10:30:45Z", + "level": "INFO", + "msg": "loaded config", + "path": "config/custos.yaml" +} +``` + +Connector loading logs indicate which connectors are enabled or disabled: + +``` +{ + "time": "2026-06-12T10:30:46Z", + "level": "INFO", + "msg": "loading SLURM Association Mapper connector" +} +``` + +## Adding New Connectors + +To add a new connector type: + +1. Implement the connector package with a `LoadConnector` function +2. Update `internal/connectors/loader.go` to import the new connector +3. Add the type-to-loader mapping in the `connectorLoaders` map: + +```go +connectorLoaders := map[string]func(context.Context, *sqlx.DB, *events.Bus, *service.Service, *sync.WaitGroup) error{ + "my-new-connector": mynewconnector.LoadConnector, + // ... existing connectors +} +``` + +4. Add the connector to your config file with the corresponding type + +No code changes are required elsewhere — the configuration system is fully extensible. + +## Example Complete Configuration + +See [config/custos.yaml](config/custos.yaml) for a complete example configuration with all connectors. diff --git a/cmd/server/main.go b/cmd/server/main.go index 42b3159dd..f465fcd98 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -30,6 +30,7 @@ import ( "syscall" "time" + "github.com/apache/airavata-custos/internal/config" "github.com/apache/airavata-custos/internal/connectors" "github.com/apache/airavata-custos/internal/db" "github.com/apache/airavata-custos/internal/server" @@ -47,6 +48,114 @@ func main() { } func run() error { + configPath := envDefault("CONFIG_PATH", "config/custos.yaml") + cfg, err := config.LoadConfig(configPath) + if err != nil { + slog.Warn("config file not found, falling back to environment variables", "config_path", configPath, "error", err) + return runLegacy() + } + + slog.Info("loaded config", "path", configPath) + + dsn := cfg.Core.Database.URL + if dsn == "" { + return errors.New("database.url in config is required") + } + + port := cfg.Core.API.Port + if port == 0 { + port = 8080 + } + addr := ":" + strconv.Itoa(port) + + maxOpen := envInt("DB_MAX_OPEN_CONNS", 25) + maxIdle := envInt("DB_MAX_IDLE_CONNS", 5) + + database, err := db.Open(db.Config{ + DSN: dsn, + MaxOpenConns: maxOpen, + MaxIdleConns: maxIdle, + }) + if err != nil { + return err + } + defer database.Close() + + // Core schema must run before connector migrations because connector + // schemas may FK into core tables. + if err := db.MigrateEmbedded(database); err != nil { + return err + } + + // Create a new event bus instance to async messaging between service and connectors + eventBus := events.New() + svc := service.New(database, eventBus) + + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + tryBootstrap(ctx, svc) + + // Tracks every background goroutine spawned by connectors so we can wait + // for them to drain on shutdown instead of killing them mid-flight. + var connectorsWG sync.WaitGroup + if err := connectors.LoadConnectorsFromConfig(ctx, cfg, database, eventBus, svc, &connectorsWG); err != nil { + return err + } + + handler := server.LoggingMiddleware(server.New(svc)) + + httpServer := &http.Server{ + Addr: addr, + Handler: handler, + ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 120 * time.Second, + } + + serverErr := make(chan error, 1) + go func() { + slog.Info("http server listening", "addr", addr) + if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + serverErr <- err + } + close(serverErr) + }() + + select { + case <-ctx.Done(): + slog.Info("shutdown signal received") + case err := <-serverErr: + if err != nil { + return err + } + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := httpServer.Shutdown(shutdownCtx); err != nil { + return err + } + + slog.Info("waiting for connectors to drain") + connectorsDone := make(chan struct{}) + go func() { + connectorsWG.Wait() + close(connectorsDone) + }() + select { + case <-connectorsDone: + slog.Info("connectors drained cleanly") + case <-time.After(30 * time.Second): + slog.Warn("connector drain timed out; some workers may have leaked") + } + + slog.Info("server stopped cleanly") + return nil +} + +func runLegacy() error { dsn := os.Getenv("DATABASE_DSN") if dsn == "" { return errors.New("DATABASE_DSN environment variable is required " + diff --git a/config/custos.yaml b/config/custos.yaml new file mode 100644 index 000000000..21b67f903 --- /dev/null +++ b/config/custos.yaml @@ -0,0 +1,59 @@ +core: + database: + url: "postgresql://custos_user:password@localhost:5432/custos_db?sslmode=disable" + api: + port: 8080 + log_level: "info" + +connectors: + slurm-mapper: + type: "slurm-association-mapper" + enabled: true + slurm_api: + url: "https://slurm-api.example.com" + version: "0.0.38" + username: "slurm_admin" + token: "${SLURM_TOKEN}" # Reference to environment variable + + slurm-usage-monitor: + type: "slurm-usage-monitor" + enabled: true + slurm_api: + url: "https://slurm-api.example.com" + version: "0.0.38" + username: "slurm_admin" + token: "${SLURM_TOKEN}" # Reference to environment variable + cluster_id: "slurm-cluster" + + comanage-provisioner: + type: "comanage-identity-provisioner" + enabled: true + registry: + url: "https://comanage.example.org" + co_id: 1 + api_user: "comanage_api_user" + api_key: "${COMANAGE_API_KEY}" # Reference to environment variable + unix_cluster: + id: 10 + person_id_type: "eppn" # Educational Person Principal Name + provisioning: + custos_cluster_id: "cluster-001" + default_shell: "/bin/bash" + homedir_prefix: "/home/" + http_timeout: "30s" + + amie-processor: + type: "amie-processor" + enabled: true + credentials: + base_url: "https://amie.xsede.org" + site_code: "XSEDE" + api_key: "${AMIE_API_KEY}" # Reference to environment variable + cluster: + id: "cluster-001" # Must match a cluster registered in Custos + polling: + poll_interval: "30s" + worker_interval: "5s" + poller_enabled: true + timeouts: + connect_timeout: "5s" diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go index 3b95e9abe..172c8607f 100644 --- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go +++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go @@ -36,6 +36,7 @@ import ( "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/worker" + custosconfig "github.com/apache/airavata-custos/internal/config" "github.com/apache/airavata-custos/internal/db" "github.com/apache/airavata-custos/pkg/events" coreservice "github.com/apache/airavata-custos/pkg/service" @@ -45,8 +46,8 @@ const connectorName = "amie" // LoadConnector skips silently when AMIE_BASE_URL / AMIE_SITE_CODE / // AMIE_API_KEY are not all set. -func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup) error { - cfg := loadConfig() +func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup, connectorConfig *custosconfig.ConnectorConfig) error { + cfg := loadConfig(connectorConfig) if cfg.AMIE.APIKey == "" || cfg.AMIE.BaseURL == "" || cfg.AMIE.SiteCode == "" { slog.Warn("AMIE credentials not fully provided, skipping AMIE connector") return nil @@ -108,16 +109,72 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, return nil } -func loadConfig() *config.Config { +func loadConfig(connectorConfig *custosconfig.ConnectorConfig) *config.Config { cfg := &config.Config{} - cfg.AMIE.BaseURL = os.Getenv("AMIE_BASE_URL") - cfg.AMIE.SiteCode = os.Getenv("AMIE_SITE_CODE") - cfg.AMIE.APIKey = os.Getenv("AMIE_API_KEY") - cfg.AMIE.PollInterval = durationEnv("AMIE_POLL_INTERVAL", 30*time.Second) - cfg.AMIE.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL", 5*time.Second) - cfg.AMIE.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT", 5*time.Second) - cfg.AMIE.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT", 20*time.Second) - cfg.AMIE.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED", true) + + // Load from connector config if available + if connectorConfig != nil { + if credentials, err := connectorConfig.GetNestedConfig("credentials"); err == nil { + if url, ok := credentials["base_url"].(string); ok { + cfg.AMIE.BaseURL = url + } + if code, ok := credentials["site_code"].(string); ok { + cfg.AMIE.SiteCode = code + } + if key, ok := credentials["api_key"].(string); ok { + cfg.AMIE.APIKey = key + } + } + + if polling, err := connectorConfig.GetNestedConfig("polling"); err == nil { + if interval, ok := polling["poll_interval"].(string); ok { + if d, err := time.ParseDuration(interval); err == nil { + cfg.AMIE.PollInterval = d + } + } + if interval, ok := polling["worker_interval"].(string); ok { + if d, err := time.ParseDuration(interval); err == nil { + cfg.AMIE.WorkerInterval = d + } + } + if enabled, ok := polling["poller_enabled"].(bool); ok { + cfg.AMIE.PollerEnabled = enabled + } + } + + if timeouts, err := connectorConfig.GetNestedConfig("timeouts"); err == nil { + if timeout, ok := timeouts["connect_timeout"].(string); ok { + if d, err := time.ParseDuration(timeout); err == nil { + cfg.AMIE.ConnectTimeout = d + } + } + } + } + + // Fall back to environment variables + if cfg.AMIE.BaseURL == "" { + cfg.AMIE.BaseURL = os.Getenv("AMIE_BASE_URL") + } + if cfg.AMIE.SiteCode == "" { + cfg.AMIE.SiteCode = os.Getenv("AMIE_SITE_CODE") + } + if cfg.AMIE.APIKey == "" { + cfg.AMIE.APIKey = os.Getenv("AMIE_API_KEY") + } + if cfg.AMIE.PollInterval == 0 { + cfg.AMIE.PollInterval = durationEnv("AMIE_POLL_INTERVAL", 30*time.Second) + } + if cfg.AMIE.WorkerInterval == 0 { + cfg.AMIE.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL", 5*time.Second) + } + if cfg.AMIE.ConnectTimeout == 0 { + cfg.AMIE.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT", 5*time.Second) + } + if cfg.AMIE.ReadTimeout == 0 { + cfg.AMIE.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT", 20*time.Second) + } + cfg.AMIE.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED", cfg.AMIE.PollerEnabled) + return cfg } diff --git a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go index 9c9628dce..d2c712148 100644 --- a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go +++ b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go @@ -31,17 +31,21 @@ import ( "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client" "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/subscribers" + "github.com/apache/airavata-custos/internal/config" "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/service" ) // LoadConnector wires the subscriber to the event bus. If any required env // var is missing, the loader logs and returns nil without registering. -func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, _ *sync.WaitGroup) error { - cfg, ok := loadConfigFromEnv() +func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, _ *sync.WaitGroup, connectorConfig *config.ConnectorConfig) error { + cfg, ok := loadConfigFromConnectorConfig(connectorConfig) if !ok { - slog.Info("comanage provisioner: required env vars not set; skipping") - return nil + cfg, ok = loadConfigFromEnv() + if !ok { + slog.Info("comanage provisioner: required config not set; skipping") + return nil + } } httpClient := client.New(cfg) subscribers.NewClusterUserSubscriber(httpClient, eventBus, coreService, cfg.CustosClusterID).RegisterSubscribers() @@ -54,6 +58,84 @@ func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreServ return nil } +func loadConfigFromConnectorConfig(connectorConfig *config.ConnectorConfig) (client.Config, bool) { + if connectorConfig == nil { + return client.Config{}, false + } + + var registryURL, apiUser, apiKey, personIDType, custosCluster, defaultShell, homedirPrefix string + var coID, unixClusterID int + timeout := 30 * time.Second + + // Load registry config + if registry, err := connectorConfig.GetNestedConfig("registry"); err == nil { + if url, ok := registry["url"].(string); ok { + registryURL = url + } + if u, ok := registry["api_user"].(string); ok { + apiUser = u + } + if k, ok := registry["api_key"].(string); ok { + apiKey = k + } + if id, ok := registry["co_id"].(float64); ok { + coID = int(id) + } + } + + // Load unix_cluster config + if unixCluster, err := connectorConfig.GetNestedConfig("unix_cluster"); err == nil { + if id, ok := unixCluster["id"].(float64); ok { + unixClusterID = int(id) + } + if pType, ok := unixCluster["person_id_type"].(string); ok { + personIDType = pType + } + } + + // Load provisioning config + if provisioning, err := connectorConfig.GetNestedConfig("provisioning"); err == nil { + if id, ok := provisioning["custos_cluster_id"].(string); ok { + custosCluster = id + } + if shell, ok := provisioning["default_shell"].(string); ok { + defaultShell = shell + } + if prefix, ok := provisioning["homedir_prefix"].(string); ok { + homedirPrefix = prefix + } + if timeoutStr, ok := provisioning["http_timeout"].(string); ok { + if d, err := time.ParseDuration(timeoutStr); err == nil { + timeout = d + } + } + } + + if registryURL == "" || coID == 0 || apiUser == "" || apiKey == "" || personIDType == "" || unixClusterID == 0 || custosCluster == "" { + return client.Config{}, false + } + + if defaultShell == "" { + defaultShell = "/bin/bash" + } + if homedirPrefix == "" { + homedirPrefix = "/home/" + } + + return client.Config{ + RegistryURL: registryURL, + COID: coID, + APIUser: apiUser, + APIKey: apiKey, + PersonIDType: personIDType, + UnixClusterID: unixClusterID, + CustosClusterID: custosCluster, + DefaultShell: defaultShell, + HomedirPrefix: homedirPrefix, + HTTPTimeout: timeout, + }, true +} + func loadConfigFromEnv() (client.Config, bool) { registryURL := os.Getenv("COMANAGE_REGISTRY_URL") coIDStr := os.Getenv("COMANAGE_CO_ID") diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go index 66aba8e6a..4fcf1a03b 100644 --- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go +++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go @@ -10,17 +10,48 @@ import ( "github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers" "github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client" + "github.com/apache/airavata-custos/internal/config" "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/service" ) -func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, _ *sync.WaitGroup) error { +func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, _ *sync.WaitGroup, connectorConfig *config.ConnectorConfig) error { + + // Read url, username, and password from config or environment variables + var apiUrl, user, token, apiVersion string + + if connectorConfig != nil { + slurmAPI, err := connectorConfig.GetNestedConfig("slurm_api") + if err == nil { + if url, ok := slurmAPI["url"].(string); ok { + apiUrl = url + } + if u, ok := slurmAPI["username"].(string); ok { + user = u + } + if t, ok := slurmAPI["token"].(string); ok { + token = t + } + if v, ok := slurmAPI["version"].(string); ok { + apiVersion = v + } + } + } + + // Fall back to environment variables + if apiUrl == "" { + apiUrl = os.Getenv("SLURM_API") + } + if user == "" { + user = os.Getenv("SLURM_USER") + } + if token == "" { + token = os.Getenv("SLURM_TOKEN") + } + if apiVersion == "" { + apiVersion = os.Getenv("SLURM_API_VERSION") + } - // Read url, username, and password from environment variables - apiUrl := os.Getenv("SLURM_API") - user := os.Getenv("SLURM_USER") - token := os.Getenv("SLURM_TOKEN") - apiVersion := os.Getenv("SLURM_API_VERSION") if apiUrl == "" || user == "" || token == "" || apiVersion == "" { slog.Info("SLURM API credentials not fully provided, skipping SLURM Association Mapper connector") slog.Info("SLURM API credentials", "apiUrl", apiUrl, "user", user, "token", token, "apiVersion", apiVersion) diff --git a/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go index e8c314f60..c698d5d4a 100644 --- a/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go +++ b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go @@ -8,23 +8,59 @@ import ( "github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client" "github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/internal/smonitor" + "github.com/apache/airavata-custos/internal/config" "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/service" "github.com/jmoiron/sqlx" ) -func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, wg *sync.WaitGroup) error { - - // Read url, username, and password from environment variables - apiUrl := os.Getenv("SLURM_API") - user := os.Getenv("SLURM_USER") - token := os.Getenv("SLURM_TOKEN") - apiVersion := os.Getenv("SLURM_API_VERSION") - monitorClusterID := os.Getenv("SLURM_MONITOR_CLUSTER_ID") - if monitorClusterID == "" { - slog.Warn("SLURM_MONITOR_CLUSTER_ID not set, defaulting to 'slurm-cluster'") - monitorClusterID = "slurm-cluster" +func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, wg *sync.WaitGroup, connectorConfig *config.ConnectorConfig) error { + + // Read url, username, and password from config or environment variables + var apiUrl, user, token, apiVersion, clusterID string + + if connectorConfig != nil { + slurmAPI, err := connectorConfig.GetNestedConfig("slurm_api") + if err == nil { + if url, ok := slurmAPI["url"].(string); ok { + apiUrl = url + } + if u, ok := slurmAPI["username"].(string); ok { + user = u + } + if t, ok := slurmAPI["token"].(string); ok { + token = t + } + if v, ok := slurmAPI["version"].(string); ok { + apiVersion = v + } + } + if id, ok := connectorConfig.Config["cluster_id"].(string); ok { + clusterID = id + } + } + + // Fall back to environment variables + if apiUrl == "" { + apiUrl = os.Getenv("SLURM_API") + } + if user == "" { + user = os.Getenv("SLURM_USER") } + if token == "" { + token = os.Getenv("SLURM_TOKEN") + } + if apiVersion == "" { + apiVersion = os.Getenv("SLURM_API_VERSION") + } + if clusterID == "" { + clusterID = os.Getenv("SLURM_MONITOR_CLUSTER_ID") + if clusterID == "" { + slog.Warn("SLURM_MONITOR_CLUSTER_ID not set, defaulting to 'slurm-cluster'") + clusterID = "slurm-cluster" + } + } + if apiUrl == "" || user == "" || token == "" || apiVersion == "" { slog.Warn("SLURM API credentials not fully provided, skipping SLURM Usage Monitor connector") slog.Warn("SLURM API credentials", "apiUrl", apiUrl, "user", user, "token", token, "apiVersion", apiVersion) @@ -32,7 +68,7 @@ func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus, coreSe } slurmClient := client.New(apiUrl, user, token, apiVersion) - monitor := smonitor.NewSlurmMonitor(slurmClient, eventBus, coreService, monitorClusterID) + monitor := smonitor.NewSlurmMonitor(slurmClient, eventBus, coreService, clusterID) wg.Add(1) go func() { defer wg.Done() diff --git a/dev-ops/compose/Makefile b/dev-ops/compose/Makefile new file mode 100644 index 000000000..fcea29a68 --- /dev/null +++ b/dev-ops/compose/Makefile @@ -0,0 +1,8 @@ +SHELL := /bin/bash +.PHONY: base up down build cli test test-integration smoke lint keys logs + +up: base + docker compose up -d --build + +down: + docker compose down -v diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 000000000..bbffcfb92 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "fmt" + "os" + "regexp" + "time" + + "gopkg.in/yaml.v3" +) + +type Config struct { + Core CoreConfig `yaml:"core"` + Connectors map[string]*ConnectorConfig `yaml:"connectors"` +} + +type CoreConfig struct { + Database DatabaseConfig `yaml:"database"` + API APIConfig `yaml:"api"` + LogLevel string `yaml:"log_level"` +} + +type DatabaseConfig struct { + URL string `yaml:"url"` +} + +type APIConfig struct { + Port int `yaml:"port"` +} + +type ConnectorConfig struct { + Type string `yaml:"type"` + Enabled bool `yaml:"enabled"` + Config map[string]interface{} `yaml:",inline"` +} + +func LoadConfig(path string) (*Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + + expandedData := expandEnvVars(string(data)) + + var cfg Config + if err := yaml.Unmarshal([]byte(expandedData), &cfg); err != nil { + return nil, fmt.Errorf("failed to parse config file: %w", err) + } + + return &cfg, nil +} + +func expandEnvVars(input string) string { + re := regexp.MustCompile(`\$\{([^}]+)\}`) + return re.ReplaceAllStringFunc(input, func(match string) string { + envVar := match[2 : len(match)-1] + if value, exists := os.LookupEnv(envVar); exists { + return value + } + return match + }) +} + +func (c *ConnectorConfig) GetStringField(key string) (string, error) { + if val, ok := c.Config[key]; ok { + if str, ok := val.(string); ok { + return str, nil + } + return "", fmt.Errorf("field %s is not a string", key) + } + return "", fmt.Errorf("field %s not found", key) +} + +func (c *ConnectorConfig) GetIntField(key string) (int, error) { + if val, ok := c.Config[key]; ok { + if num, ok := val.(int); ok { + return num, nil + } + return 0, fmt.Errorf("field %s is not an integer", key) + } + return 0, fmt.Errorf("field %s not found", key) +} + +func (c *ConnectorConfig) GetDurationField(key string) (time.Duration, error) { + if val, ok := c.Config[key]; ok { + if str, ok := val.(string); ok { + return time.ParseDuration(str) + } + return 0, fmt.Errorf("field %s is not a string", key) + } + return 0, fmt.Errorf("field %s not found", key) +} + +func (c *ConnectorConfig) GetNestedConfig(key string) (map[string]interface{}, error) { + if val, ok := c.Config[key]; ok { + if nested, ok := val.(map[string]interface{}); ok { + return nested, nil + } + return nil, fmt.Errorf("field %s is not a map", key) + } + return nil, fmt.Errorf("field %s not found", key) +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 000000000..787fbe078 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package config + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLoadConfig(t *testing.T) { + cfg, err := LoadConfig("../../config/custos.yaml") + require.NoError(t, err) + + assert.Equal(t, "info", cfg.Core.LogLevel) + assert.Equal(t, 8080, cfg.Core.API.Port) + assert.NotEmpty(t, cfg.Core.Database.URL) + + assert.NotNil(t, cfg.Connectors) + assert.Greater(t, len(cfg.Connectors), 0) + + slurmMapperCfg, ok := cfg.Connectors["slurm-mapper"] + assert.True(t, ok) + assert.True(t, slurmMapperCfg.Enabled) + assert.Equal(t, "slurm-association-mapper", slurmMapperCfg.Type) +} + +func TestExpandEnvVars(t *testing.T) { + t.Run("expands environment variables", func(t *testing.T) { + os.Setenv("TEST_VAR", "test_value") + defer os.Unsetenv("TEST_VAR") + + input := "url: ${TEST_VAR}" + output := expandEnvVars(input) + assert.Equal(t, "url: test_value", output) + }) + + t.Run("leaves unexpanded vars when env not set", func(t *testing.T) { + os.Unsetenv("UNDEFINED_VAR") + input := "url: ${UNDEFINED_VAR}" + output := expandEnvVars(input) + assert.Equal(t, "url: ${UNDEFINED_VAR}", output) + }) + + t.Run("expands multiple variables", func(t *testing.T) { + os.Setenv("VAR1", "value1") + os.Setenv("VAR2", "value2") + defer os.Unsetenv("VAR1") + defer os.Unsetenv("VAR2") + + input := "url: ${VAR1}\nkey: ${VAR2}" + output := expandEnvVars(input) + assert.Equal(t, "url: value1\nkey: value2", output) + }) +} + +func TestConnectorConfigGetters(t *testing.T) { + cfg, err := LoadConfig("../../config/custos.yaml") + require.NoError(t, err) + + t.Run("verify connector type field", func(t *testing.T) { + slurmMapper, ok := cfg.Connectors["slurm-mapper"] + require.True(t, ok) + assert.Equal(t, "slurm-association-mapper", slurmMapper.Type) + }) + + t.Run("GetNestedConfig", func(t *testing.T) { + slurmMapper, ok := cfg.Connectors["slurm-mapper"] + require.True(t, ok) + nested, err := slurmMapper.GetNestedConfig("slurm_api") + require.NoError(t, err) + assert.NotNil(t, nested) + assert.Contains(t, nested, "url") + assert.Contains(t, nested, "username") + }) + + t.Run("GetNestedConfig from provisioner", func(t *testing.T) { + provisioner, ok := cfg.Connectors["comanage-provisioner"] + if ok && provisioner != nil { + nested, err := provisioner.GetNestedConfig("registry") + require.NoError(t, err) + assert.NotNil(t, nested) + } + }) +} diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go index 258a6d924..50e1c4948 100644 --- a/internal/connectors/loader.go +++ b/internal/connectors/loader.go @@ -28,6 +28,7 @@ import ( "github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/pkg/comanage" "github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper" "github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/pkg/monitor" + "github.com/apache/airavata-custos/internal/config" "github.com/apache/airavata-custos/pkg/events" "github.com/apache/airavata-custos/pkg/service" ) @@ -36,24 +37,24 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus *events.Bus slog.Info("loading connectors") slog.Info("loading SLURM Association Mapper connector") - if err := smapper.LoadConnector(ctx, database, eventBus, coreService, wg); err != nil { + if err := smapper.LoadConnector(ctx, database, eventBus, coreService, wg, nil); err != nil { slog.Error("failed to load SLURM Association Mapper connector", "error", err) return err } slog.Info("loading AMIE connector") - if err := amie.LoadConnector(ctx, database, eventBus, coreService, wg); err != nil { + if err := amie.LoadConnector(ctx, database, eventBus, coreService, wg, nil); err != nil { slog.Error("failed to load AMIE connector", "error", err) return err } slog.Info("loading COmanage Identity-Provisioner connector") - if err := comanage.LoadConnector(ctx, database, eventBus, coreService, wg); err != nil { + if err := comanage.LoadConnector(ctx, database, eventBus, coreService, wg, nil); err != nil { slog.Error("failed to load COmanage Identity-Provisioner connector", "error", err) return err } slog.Info("loading SLURM Usage Monitor connector") - if err := monitor.LoadConnector(ctx, database, eventBus, coreService, wg); err != nil { + if err := monitor.LoadConnector(ctx, database, eventBus, coreService, wg, nil); err != nil { slog.Error("failed to load SLURM Usage Monitor connector", "error", err) return err } @@ -61,3 +62,41 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB, eventBus *events.Bus slog.Info("finished loading connectors") return nil } + +func LoadConnectorsFromConfig(ctx context.Context, cfg *config.Config, database *sqlx.DB, eventBus *events.Bus, coreService *service.Service, wg *sync.WaitGroup) error { + slog.Info("loading connectors from config") + + connectorLoaders := map[string]func(context.Context, *sqlx.DB, *events.Bus, *service.Service, *sync.WaitGroup, *config.ConnectorConfig) error{ + "slurm-association-mapper": smapper.LoadConnector, + "amie-processor": amie.LoadConnector, + "comanage-identity-provisioner": comanage.LoadConnector, + "slurm-usage-monitor": monitor.LoadConnector, + } + + for connectorName, connectorCfg := range cfg.Connectors { + if connectorCfg == nil { + slog.Debug("connector config is nil", "name", connectorName) + continue + } + + if !connectorCfg.Enabled { + slog.Info("connector is disabled", "name", connectorName, "type", connectorCfg.Type) + continue + } + + loader, ok := connectorLoaders[connectorCfg.Type] + if !ok { + slog.Warn("unknown connector type", "name", connectorName, "type", connectorCfg.Type) + continue + } + + slog.Info("loading connector", "name", connectorName, "type", connectorCfg.Type) + if err := loader(ctx, database, eventBus, coreService, wg, connectorCfg); err != nil { + slog.Error("failed to load connector", "name", connectorName, "type", connectorCfg.Type, "error", err) + return err + } + } + + slog.Info("finished loading connectors from config") + return nil +}
