This is an automated email from the ASF dual-hosted git repository.

DImuthuUpe pushed a commit to branch global-config
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit 98100344ef4f6f5c440d97958ab30ab75f55fba3
Author: DImuthuUpe <[email protected]>
AuthorDate: Fri Jun 12 18:34:18 2026 -0400

    Initial change for global config
---
 CONFIG.md                                          | 242 +++++++++++++++++++++
 cmd/server/main.go                                 | 109 ++++++++++
 config/custos.yaml                                 |  59 +++++
 .../ACCESS/AMIE-Processor/pkg/amie/loader.go       |  79 ++++++-
 .../Identity-Provisioner/pkg/comanage/loader.go    |  90 +++++++-
 .../SLURM/Association-Mapper/pkg/smapper/loader.go |  43 +++-
 .../SLURM/Usage-Monitor/pkg/monitor/loader.go      |  60 ++++-
 dev-ops/compose/Makefile                           |   8 +
 internal/config/config.go                          | 119 ++++++++++
 internal/config/config_test.go                     | 102 +++++++++
 internal/connectors/loader.go                      |  47 +++-
 11 files changed, 921 insertions(+), 37 deletions(-)

diff --git a/CONFIG.md b/CONFIG.md
new file mode 100644
index 000000000..5d2b909df
--- /dev/null
+++ b/CONFIG.md
@@ -0,0 +1,242 @@
+# Configuration Guide
+
+Custos server can be configured using a YAML configuration file instead of 
environment variables. This guide explains how to set up and use the 
configuration system.
+
+## Quick Start
+
+1. Place your configuration file at `config/custos.yaml`
+2. Start the server:
+   ```bash
+   ./custos
+   ```
+
+The server will automatically load the configuration file. If the config file 
is not found, it falls back to legacy environment variable configuration.
+
+## Configuration File Location
+
+By default, the server looks for the configuration file at 
`config/custos.yaml`. You can customize this location using the `CONFIG_PATH` 
environment variable:
+
+```bash
+CONFIG_PATH=/etc/custos/config.yaml ./custos
+```
+
+## Configuration Structure
+
+### Core Configuration
+
+The `core` section contains essential server settings:
+
+```yaml
+core:
+  database:
+    url: "postgresql://user:password@localhost:5432/custos_db?sslmode=disable"
+  api:
+    port: 8080
+  log_level: "info"
+```
+
+- **database.url**: PostgreSQL connection string (required)
+- **api.port**: HTTP API port (default: 8080)
+- **log_level**: Logging level (info, debug, warn, error)
+
+### Connector Configuration
+
+The `connectors` section defines which connectors are enabled and their 
settings.
+
+#### SLURM Association Mapper
+
+```yaml
+connectors:
+  slurm-mapper:
+    type: "slurm-association-mapper"
+    enabled: true
+    slurm_api:
+      url: "https://slurm-api.example.com";
+      version: "0.0.38"
+      username: "slurm_admin"
+      token: "${SLURM_TOKEN}"
+```
+
+#### SLURM Usage Monitor
+
+```yaml
+  slurm-usage-monitor:
+    type: "slurm-usage-monitor"
+    enabled: true
+    slurm_api:
+      url: "https://slurm-api.example.com";
+      version: "0.0.38"
+      username: "slurm_admin"
+      token: "${SLURM_TOKEN}"
+    cluster_id: "slurm-cluster"
+```
+
+#### COmanage Identity Provisioner
+
+```yaml
+  comanage-provisioner:
+    type: "comanage-identity-provisioner"
+    enabled: true
+    registry:
+      url: "https://comanage.example.org";
+      co_id: 1
+      api_user: "comanage_api_user"
+      api_key: "${COMANAGE_API_KEY}"
+    unix_cluster:
+      id: 10
+      person_id_type: "eppn"
+    provisioning:
+      custos_cluster_id: "cluster-001"
+      default_shell: "/bin/bash"
+      homedir_prefix: "/home/"
+      http_timeout: "30s"
+```
+
+#### AMIE Processor
+
+```yaml
+  amie-processor:
+    type: "amie-processor"
+    enabled: true
+    credentials:
+      base_url: "https://amie.xsede.org";
+      site_code: "XSEDE"
+      api_key: "${AMIE_API_KEY}"
+    cluster:
+      id: "cluster-001"
+    polling:
+      poll_interval: "30s"
+      worker_interval: "5s"
+      poller_enabled: true
+    timeouts:
+      connect_timeout: "5s"
+```
+
+## Environment Variable Substitution
+
+The configuration parser supports environment variable substitution using the 
`${VAR_NAME}` syntax. This allows you to:
+
+1. Keep sensitive values (API keys, passwords) out of version control
+2. Use different values for different deployment environments
+3. Reference environment variables directly in the config file
+
+### Example
+
+```yaml
+core:
+  database:
+    url: "${DATABASE_URL}"
+```
+
+In your shell:
+```bash
+export DATABASE_URL="postgresql://user:password@localhost:5432/custos_db"
+./custos
+```
+
+If an environment variable referenced in the config file is not set, it will 
remain unexpanded:
+
+```yaml
+token: "${MISSING_TOKEN}"  # Will not be replaced if MISSING_TOKEN is not set
+```
+
+## Enabling/Disabling Connectors
+
+The configuration supports any number of connectors with any names. Each 
connector is identified by its `type` field which determines which loader is 
used:
+
+```yaml
+connectors:
+  connector-name:  # Can be any identifier
+    type: "slurm-association-mapper"  # Determines which loader to use
+    enabled: true
+    # Connector-specific configuration
+```
+
+To disable a connector, set `enabled: false`:
+
+```yaml
+connectors:
+  slurm-mapper:
+    type: "slurm-association-mapper"
+    enabled: false
+    # Rest of configuration is still required but will be ignored
+```
+
+When a connector is disabled:
+- It will not be loaded on server startup
+- An info log message will indicate it's disabled
+- The server will continue to function normally
+
+### Supported Connector Types
+
+- `slurm-association-mapper` - SLURM Association Mapper
+- `slurm-usage-monitor` - SLURM Usage Monitor
+- `comanage-identity-provisioner` - COmanage Identity Provisioner
+- `amie-processor` - AMIE Processor
+
+New connector types can be added by:
+1. Implementing the connector package
+2. Adding the loader function to `internal/connectors/loader.go`
+3. Mapping the type to the loader in `connectorLoaders` map
+
+## Fallback to Environment Variables
+
+For backward compatibility, if the configuration file is not found, the server 
falls back to the legacy environment variable configuration:
+
+```bash
+DATABASE_DSN="..." HTTP_ADDR=":8080" ./custos
+```
+
+## Configuration Validation
+
+The configuration is validated during parsing. Common issues:
+
+1. **Missing required fields**: If `core.database.url` is missing, the server 
will fail to start
+2. **Invalid YAML syntax**: Check your YAML indentation and structure
+3. **Undefined environment variables**: Variables referenced with `${VAR}` 
will be left as-is if not set
+
+## Logging
+
+Configuration loading is logged at the INFO level:
+
+```
+{
+  "time": "2026-06-12T10:30:45Z",
+  "level": "INFO",
+  "msg": "loaded config",
+  "path": "config/custos.yaml"
+}
+```
+
+Connector loading logs indicate which connectors are enabled or disabled:
+
+```
+{
+  "time": "2026-06-12T10:30:46Z",
+  "level": "INFO",
+  "msg": "loading SLURM Association Mapper connector"
+}
+```
+
+## Adding New Connectors
+
+To add a new connector type:
+
+1. Implement the connector package with a `LoadConnector` function
+2. Update `internal/connectors/loader.go` to import the new connector
+3. Add the type-to-loader mapping in the `connectorLoaders` map:
+
+```go
+connectorLoaders := map[string]func(context.Context, *sqlx.DB, *events.Bus, 
*service.Service, *sync.WaitGroup) error{
+    "my-new-connector": mynewconnector.LoadConnector,
+    // ... existing connectors
+}
+```
+
+4. Add the connector to your config file with the corresponding type
+
+No code changes are required elsewhere — the configuration system is fully 
extensible.
+
+## Example Complete Configuration
+
+See [config/custos.yaml](config/custos.yaml) for a complete example 
configuration with all connectors.
diff --git a/cmd/server/main.go b/cmd/server/main.go
index 42b3159dd..f465fcd98 100644
--- a/cmd/server/main.go
+++ b/cmd/server/main.go
@@ -30,6 +30,7 @@ import (
        "syscall"
        "time"
 
+       "github.com/apache/airavata-custos/internal/config"
        "github.com/apache/airavata-custos/internal/connectors"
        "github.com/apache/airavata-custos/internal/db"
        "github.com/apache/airavata-custos/internal/server"
@@ -47,6 +48,114 @@ func main() {
 }
 
 func run() error {
+       configPath := envDefault("CONFIG_PATH", "config/custos.yaml")
+       cfg, err := config.LoadConfig(configPath)
+       if err != nil {
+               slog.Warn("config file not found, falling back to environment 
variables", "config_path", configPath, "error", err)
+               return runLegacy()
+       }
+
+       slog.Info("loaded config", "path", configPath)
+
+       dsn := cfg.Core.Database.URL
+       if dsn == "" {
+               return errors.New("database.url in config is required")
+       }
+
+       port := cfg.Core.API.Port
+       if port == 0 {
+               port = 8080
+       }
+       addr := ":" + strconv.Itoa(port)
+
+       maxOpen := envInt("DB_MAX_OPEN_CONNS", 25)
+       maxIdle := envInt("DB_MAX_IDLE_CONNS", 5)
+
+       database, err := db.Open(db.Config{
+               DSN:          dsn,
+               MaxOpenConns: maxOpen,
+               MaxIdleConns: maxIdle,
+       })
+       if err != nil {
+               return err
+       }
+       defer database.Close()
+
+       // Core schema must run before connector migrations because connector
+       // schemas may FK into core tables.
+       if err := db.MigrateEmbedded(database); err != nil {
+               return err
+       }
+
+       // Create a new event bus instance to async messaging between service 
and connectors
+       eventBus := events.New()
+       svc := service.New(database, eventBus)
+
+       ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, 
syscall.SIGTERM)
+       defer stop()
+
+       tryBootstrap(ctx, svc)
+
+       // Tracks every background goroutine spawned by connectors so we can 
wait
+       // for them to drain on shutdown instead of killing them mid-flight.
+       var connectorsWG sync.WaitGroup
+       if err := connectors.LoadConnectorsFromConfig(ctx, cfg, database, 
eventBus, svc, &connectorsWG); err != nil {
+               return err
+       }
+
+       handler := server.LoggingMiddleware(server.New(svc))
+
+       httpServer := &http.Server{
+               Addr:              addr,
+               Handler:           handler,
+               ReadHeaderTimeout: 10 * time.Second,
+               ReadTimeout:       30 * time.Second,
+               WriteTimeout:      30 * time.Second,
+               IdleTimeout:       120 * time.Second,
+       }
+
+       serverErr := make(chan error, 1)
+       go func() {
+               slog.Info("http server listening", "addr", addr)
+               if err := httpServer.ListenAndServe(); err != nil && 
!errors.Is(err, http.ErrServerClosed) {
+                       serverErr <- err
+               }
+               close(serverErr)
+       }()
+
+       select {
+       case <-ctx.Done():
+               slog.Info("shutdown signal received")
+       case err := <-serverErr:
+               if err != nil {
+                       return err
+               }
+       }
+
+       shutdownCtx, cancel := context.WithTimeout(context.Background(), 
15*time.Second)
+       defer cancel()
+       if err := httpServer.Shutdown(shutdownCtx); err != nil {
+               return err
+       }
+
+       slog.Info("waiting for connectors to drain")
+       connectorsDone := make(chan struct{})
+       go func() {
+               connectorsWG.Wait()
+               close(connectorsDone)
+       }()
+       select {
+       case <-connectorsDone:
+               slog.Info("connectors drained cleanly")
+       case <-time.After(30 * time.Second):
+               slog.Warn("connector drain timed out; some workers may have 
leaked")
+       }
+
+       slog.Info("server stopped cleanly")
+       return nil
+}
+
+func runLegacy() error {
        dsn := os.Getenv("DATABASE_DSN")
        if dsn == "" {
                return errors.New("DATABASE_DSN environment variable is 
required " +
diff --git a/config/custos.yaml b/config/custos.yaml
new file mode 100644
index 000000000..21b67f903
--- /dev/null
+++ b/config/custos.yaml
@@ -0,0 +1,59 @@
+core:
+  database:
+    url: 
"postgresql://custos_user:password@localhost:5432/custos_db?sslmode=disable"
+  api:
+    port: 8080
+  log_level: "info"
+
+connectors:
+  slurm-mapper:
+    type: "slurm-association-mapper"
+    enabled: true
+    slurm_api:
+      url: "https://slurm-api.example.com";
+      version: "0.0.38"
+      username: "slurm_admin"
+      token: "${SLURM_TOKEN}" # Reference to environment variable
+
+  slurm-usage-monitor:
+    type: "slurm-usage-monitor"
+    enabled: true
+    slurm_api:
+      url: "https://slurm-api.example.com";
+      version: "0.0.38"
+      username: "slurm_admin"
+      token: "${SLURM_TOKEN}" # Reference to environment variable
+    cluster_id: "slurm-cluster"
+
+  comanage-provisioner:
+    type: "comanage-identity-provisioner"
+    enabled: true
+    registry:
+      url: "https://comanage.example.org";
+      co_id: 1
+      api_user: "comanage_api_user"
+      api_key: "${COMANAGE_API_KEY}" # Reference to environment variable
+    unix_cluster:
+      id: 10
+      person_id_type: "eppn" # Educational Person Principal Name
+    provisioning:
+      custos_cluster_id: "cluster-001"
+      default_shell: "/bin/bash"
+      homedir_prefix: "/home/"
+      http_timeout: "30s"
+
+  amie-processor:
+    type: "amie-processor"
+    enabled: true
+    credentials:
+      base_url: "https://amie.xsede.org";
+      site_code: "XSEDE"
+      api_key: "${AMIE_API_KEY}" # Reference to environment variable
+    cluster:
+      id: "cluster-001" # Must match a cluster registered in Custos
+    polling:
+      poll_interval: "30s"
+      worker_interval: "5s"
+      poller_enabled: true
+    timeouts:
+      connect_timeout: "5s"
diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go 
b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
index 3b95e9abe..172c8607f 100644
--- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
+++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go
@@ -36,6 +36,7 @@ import (
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store"
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/worker"
+       custosconfig "github.com/apache/airavata-custos/internal/config"
        "github.com/apache/airavata-custos/internal/db"
        "github.com/apache/airavata-custos/pkg/events"
        coreservice "github.com/apache/airavata-custos/pkg/service"
@@ -45,8 +46,8 @@ const connectorName = "amie"
 
 // LoadConnector skips silently when AMIE_BASE_URL / AMIE_SITE_CODE /
 // AMIE_API_KEY are not all set.
-func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus 
*events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup) error {
-       cfg := loadConfig()
+func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus 
*events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup, 
connectorConfig *custosconfig.ConnectorConfig) error {
+       cfg := loadConfig(connectorConfig)
        if cfg.AMIE.APIKey == "" || cfg.AMIE.BaseURL == "" || cfg.AMIE.SiteCode 
== "" {
                slog.Warn("AMIE credentials not fully provided, skipping AMIE 
connector")
                return nil
@@ -108,16 +109,72 @@ func LoadConnector(ctx context.Context, database 
*sqlx.DB, eventBus *events.Bus,
        return nil
 }
 
-func loadConfig() *config.Config {
+func loadConfig(connectorConfig *custosconfig.ConnectorConfig) *config.Config {
        cfg := &config.Config{}
-       cfg.AMIE.BaseURL = os.Getenv("AMIE_BASE_URL")
-       cfg.AMIE.SiteCode = os.Getenv("AMIE_SITE_CODE")
-       cfg.AMIE.APIKey = os.Getenv("AMIE_API_KEY")
-       cfg.AMIE.PollInterval = durationEnv("AMIE_POLL_INTERVAL", 
30*time.Second)
-       cfg.AMIE.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL", 
5*time.Second)
-       cfg.AMIE.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT", 
5*time.Second)
-       cfg.AMIE.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT", 20*time.Second)
-       cfg.AMIE.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED", true)
+
+       // Load from connector config if available
+       if connectorConfig != nil {
+               if credentials, err := 
connectorConfig.GetNestedConfig("credentials"); err == nil {
+                       if url, ok := credentials["base_url"].(string); ok {
+                               cfg.AMIE.BaseURL = url
+                       }
+                       if code, ok := credentials["site_code"].(string); ok {
+                               cfg.AMIE.SiteCode = code
+                       }
+                       if key, ok := credentials["api_key"].(string); ok {
+                               cfg.AMIE.APIKey = key
+                       }
+               }
+
+               if polling, err := connectorConfig.GetNestedConfig("polling"); 
err == nil {
+                       if interval, ok := polling["poll_interval"].(string); 
ok {
+                               if d, err := time.ParseDuration(interval); err 
== nil {
+                                       cfg.AMIE.PollInterval = d
+                               }
+                       }
+                       if interval, ok := polling["worker_interval"].(string); 
ok {
+                               if d, err := time.ParseDuration(interval); err 
== nil {
+                                       cfg.AMIE.WorkerInterval = d
+                               }
+                       }
+                       if enabled, ok := polling["poller_enabled"].(bool); ok {
+                               cfg.AMIE.PollerEnabled = enabled
+                       }
+               }
+
+               if timeouts, err := 
connectorConfig.GetNestedConfig("timeouts"); err == nil {
+                       if timeout, ok := timeouts["connect_timeout"].(string); 
ok {
+                               if d, err := time.ParseDuration(timeout); err 
== nil {
+                                       cfg.AMIE.ConnectTimeout = d
+                               }
+                       }
+               }
+       }
+
+       // Fall back to environment variables
+       if cfg.AMIE.BaseURL == "" {
+               cfg.AMIE.BaseURL = os.Getenv("AMIE_BASE_URL")
+       }
+       if cfg.AMIE.SiteCode == "" {
+               cfg.AMIE.SiteCode = os.Getenv("AMIE_SITE_CODE")
+       }
+       if cfg.AMIE.APIKey == "" {
+               cfg.AMIE.APIKey = os.Getenv("AMIE_API_KEY")
+       }
+       if cfg.AMIE.PollInterval == 0 {
+               cfg.AMIE.PollInterval = durationEnv("AMIE_POLL_INTERVAL", 
30*time.Second)
+       }
+       if cfg.AMIE.WorkerInterval == 0 {
+               cfg.AMIE.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL", 
5*time.Second)
+       }
+       if cfg.AMIE.ConnectTimeout == 0 {
+               cfg.AMIE.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT", 
5*time.Second)
+       }
+       if cfg.AMIE.ReadTimeout == 0 {
+               cfg.AMIE.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT", 
20*time.Second)
+       }
+       cfg.AMIE.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED", 
cfg.AMIE.PollerEnabled)
+
        return cfg
 }
 
diff --git a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go 
b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
index 9c9628dce..d2c712148 100644
--- a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
+++ b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
@@ -31,17 +31,21 @@ import (
 
        
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
        
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/subscribers"
+       "github.com/apache/airavata-custos/internal/config"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
 )
 
 // LoadConnector wires the subscriber to the event bus. If any required env
 // var is missing, the loader logs and returns nil without registering.
-func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, _ *sync.WaitGroup) error {
-       cfg, ok := loadConfigFromEnv()
+func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, _ *sync.WaitGroup, connectorConfig 
*config.ConnectorConfig) error {
+       cfg, ok := loadConfigFromConnectorConfig(connectorConfig)
        if !ok {
-               slog.Info("comanage provisioner: required env vars not set; 
skipping")
-               return nil
+               cfg, ok = loadConfigFromEnv()
+               if !ok {
+                       slog.Info("comanage provisioner: required config not 
set; skipping")
+                       return nil
+               }
        }
        httpClient := client.New(cfg)
        subscribers.NewClusterUserSubscriber(httpClient, eventBus, coreService, 
cfg.CustosClusterID).RegisterSubscribers()
@@ -54,6 +58,84 @@ func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus 
*events.Bus, coreServ
        return nil
 }
 
+func loadConfigFromConnectorConfig(connectorConfig *config.ConnectorConfig) 
(client.Config, bool) {
+       if connectorConfig == nil {
+               return client.Config{}, false
+       }
+
+       var registryURL, apiUser, apiKey, personIDType, custosCluster, 
defaultShell, homedirPrefix string
+       var coID, unixClusterID int
+       timeout := 30 * time.Second
+
+       // Load registry config
+       if registry, err := connectorConfig.GetNestedConfig("registry"); err == 
nil {
+               if url, ok := registry["url"].(string); ok {
+                       registryURL = url
+               }
+               if u, ok := registry["api_user"].(string); ok {
+                       apiUser = u
+               }
+               if k, ok := registry["api_key"].(string); ok {
+                       apiKey = k
+               }
+               if id, ok := registry["co_id"].(float64); ok {
+                       coID = int(id)
+               }
+       }
+
+       // Load unix_cluster config
+       if unixCluster, err := connectorConfig.GetNestedConfig("unix_cluster"); 
err == nil {
+               if id, ok := unixCluster["id"].(float64); ok {
+                       unixClusterID = int(id)
+               }
+               if pType, ok := unixCluster["person_id_type"].(string); ok {
+                       personIDType = pType
+               }
+       }
+
+       // Load provisioning config
+       if provisioning, err := 
connectorConfig.GetNestedConfig("provisioning"); err == nil {
+               if id, ok := provisioning["custos_cluster_id"].(string); ok {
+                       custosCluster = id
+               }
+               if shell, ok := provisioning["default_shell"].(string); ok {
+                       defaultShell = shell
+               }
+               if prefix, ok := provisioning["homedir_prefix"].(string); ok {
+                       homedirPrefix = prefix
+               }
+               if timeoutStr, ok := provisioning["http_timeout"].(string); ok {
+                       if d, err := time.ParseDuration(timeoutStr); err == nil 
{
+                               timeout = d
+                       }
+               }
+       }
+
+       if registryURL == "" || coID == 0 || apiUser == "" || apiKey == "" || 
personIDType == "" || unixClusterID == 0 || custosCluster == "" {
+               return client.Config{}, false
+       }
+
+       if defaultShell == "" {
+               defaultShell = "/bin/bash"
+       }
+       if homedirPrefix == "" {
+               homedirPrefix = "/home/"
+       }
+
+       return client.Config{
+               RegistryURL:     registryURL,
+               COID:            coID,
+               APIUser:         apiUser,
+               APIKey:          apiKey,
+               PersonIDType:    personIDType,
+               UnixClusterID:   unixClusterID,
+               CustosClusterID: custosCluster,
+               DefaultShell:    defaultShell,
+               HomedirPrefix:   homedirPrefix,
+               HTTPTimeout:     timeout,
+       }, true
+}
+
 func loadConfigFromEnv() (client.Config, bool) {
        registryURL := os.Getenv("COMANAGE_REGISTRY_URL")
        coIDStr := os.Getenv("COMANAGE_CO_ID")
diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go 
b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
index 66aba8e6a..4fcf1a03b 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
@@ -10,17 +10,48 @@ import (
 
        
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers"
        
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
+       "github.com/apache/airavata-custos/internal/config"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
 )
 
-func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, _ *sync.WaitGroup) error {
+func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, _ *sync.WaitGroup, connectorConfig 
*config.ConnectorConfig) error {
+
+       // Read url, username, and password from config or environment variables
+       var apiUrl, user, token, apiVersion string
+
+       if connectorConfig != nil {
+               slurmAPI, err := connectorConfig.GetNestedConfig("slurm_api")
+               if err == nil {
+                       if url, ok := slurmAPI["url"].(string); ok {
+                               apiUrl = url
+                       }
+                       if u, ok := slurmAPI["username"].(string); ok {
+                               user = u
+                       }
+                       if t, ok := slurmAPI["token"].(string); ok {
+                               token = t
+                       }
+                       if v, ok := slurmAPI["version"].(string); ok {
+                               apiVersion = v
+                       }
+               }
+       }
+
+       // Fall back to environment variables
+       if apiUrl == "" {
+               apiUrl = os.Getenv("SLURM_API")
+       }
+       if user == "" {
+               user = os.Getenv("SLURM_USER")
+       }
+       if token == "" {
+               token = os.Getenv("SLURM_TOKEN")
+       }
+       if apiVersion == "" {
+               apiVersion = os.Getenv("SLURM_API_VERSION")
+       }
 
-       // Read url, username, and password from environment variables
-       apiUrl := os.Getenv("SLURM_API")
-       user := os.Getenv("SLURM_USER")
-       token := os.Getenv("SLURM_TOKEN")
-       apiVersion := os.Getenv("SLURM_API_VERSION")
        if apiUrl == "" || user == "" || token == "" || apiVersion == "" {
                slog.Info("SLURM API credentials not fully provided, skipping 
SLURM Association Mapper connector")
                slog.Info("SLURM API credentials", "apiUrl", apiUrl, "user", 
user, "token", token, "apiVersion", apiVersion)
diff --git a/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go 
b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
index e8c314f60..c698d5d4a 100644
--- a/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
+++ b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
@@ -8,23 +8,59 @@ import (
 
        
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
        
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/internal/smonitor"
+       "github.com/apache/airavata-custos/internal/config"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
        "github.com/jmoiron/sqlx"
 )
 
-func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, wg *sync.WaitGroup) error {
-
-       // Read url, username, and password from environment variables
-       apiUrl := os.Getenv("SLURM_API")
-       user := os.Getenv("SLURM_USER")
-       token := os.Getenv("SLURM_TOKEN")
-       apiVersion := os.Getenv("SLURM_API_VERSION")
-       monitorClusterID := os.Getenv("SLURM_MONITOR_CLUSTER_ID")
-       if monitorClusterID == "" {
-               slog.Warn("SLURM_MONITOR_CLUSTER_ID not set, defaulting to 
'slurm-cluster'")
-               monitorClusterID = "slurm-cluster"
+func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, wg *sync.WaitGroup, connectorConfig 
*config.ConnectorConfig) error {
+
+       // Read url, username, and password from config or environment variables
+       var apiUrl, user, token, apiVersion, clusterID string
+
+       if connectorConfig != nil {
+               slurmAPI, err := connectorConfig.GetNestedConfig("slurm_api")
+               if err == nil {
+                       if url, ok := slurmAPI["url"].(string); ok {
+                               apiUrl = url
+                       }
+                       if u, ok := slurmAPI["username"].(string); ok {
+                               user = u
+                       }
+                       if t, ok := slurmAPI["token"].(string); ok {
+                               token = t
+                       }
+                       if v, ok := slurmAPI["version"].(string); ok {
+                               apiVersion = v
+                       }
+               }
+               if id, ok := connectorConfig.Config["cluster_id"].(string); ok {
+                       clusterID = id
+               }
+       }
+
+       // Fall back to environment variables
+       if apiUrl == "" {
+               apiUrl = os.Getenv("SLURM_API")
+       }
+       if user == "" {
+               user = os.Getenv("SLURM_USER")
        }
+       if token == "" {
+               token = os.Getenv("SLURM_TOKEN")
+       }
+       if apiVersion == "" {
+               apiVersion = os.Getenv("SLURM_API_VERSION")
+       }
+       if clusterID == "" {
+               clusterID = os.Getenv("SLURM_MONITOR_CLUSTER_ID")
+               if clusterID == "" {
+                       slog.Warn("SLURM_MONITOR_CLUSTER_ID not set, defaulting 
to 'slurm-cluster'")
+                       clusterID = "slurm-cluster"
+               }
+       }
+
        if apiUrl == "" || user == "" || token == "" || apiVersion == "" {
                slog.Warn("SLURM API credentials not fully provided, skipping 
SLURM Usage Monitor connector")
                slog.Warn("SLURM API credentials", "apiUrl", apiUrl, "user", 
user, "token", token, "apiVersion", apiVersion)
@@ -32,7 +68,7 @@ func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus 
*events.Bus, coreSe
        }
 
        slurmClient := client.New(apiUrl, user, token, apiVersion)
-       monitor := smonitor.NewSlurmMonitor(slurmClient, eventBus, coreService, 
monitorClusterID)
+       monitor := smonitor.NewSlurmMonitor(slurmClient, eventBus, coreService, 
clusterID)
        wg.Add(1)
        go func() {
                defer wg.Done()
diff --git a/dev-ops/compose/Makefile b/dev-ops/compose/Makefile
new file mode 100644
index 000000000..fcea29a68
--- /dev/null
+++ b/dev-ops/compose/Makefile
@@ -0,0 +1,8 @@
+SHELL := /bin/bash
+.PHONY: base up down build cli test test-integration smoke lint keys logs
+
+up: base
+       docker compose up -d --build
+
+down:
+       docker compose down -v
diff --git a/internal/config/config.go b/internal/config/config.go
new file mode 100644
index 000000000..bbffcfb92
--- /dev/null
+++ b/internal/config/config.go
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+       "fmt"
+       "os"
+       "regexp"
+       "time"
+
+       "gopkg.in/yaml.v3"
+)
+
+type Config struct {
+       Core       CoreConfig `yaml:"core"`
+       Connectors map[string]*ConnectorConfig `yaml:"connectors"`
+}
+
+type CoreConfig struct {
+       Database DatabaseConfig `yaml:"database"`
+       API      APIConfig      `yaml:"api"`
+       LogLevel string         `yaml:"log_level"`
+}
+
+type DatabaseConfig struct {
+       URL string `yaml:"url"`
+}
+
+type APIConfig struct {
+       Port int `yaml:"port"`
+}
+
+type ConnectorConfig struct {
+       Type    string                 `yaml:"type"`
+       Enabled bool                   `yaml:"enabled"`
+       Config  map[string]interface{} `yaml:",inline"`
+}
+
+func LoadConfig(path string) (*Config, error) {
+       data, err := os.ReadFile(path)
+       if err != nil {
+               return nil, fmt.Errorf("failed to read config file: %w", err)
+       }
+
+       expandedData := expandEnvVars(string(data))
+
+       var cfg Config
+       if err := yaml.Unmarshal([]byte(expandedData), &cfg); err != nil {
+               return nil, fmt.Errorf("failed to parse config file: %w", err)
+       }
+
+       return &cfg, nil
+}
+
+func expandEnvVars(input string) string {
+       re := regexp.MustCompile(`\$\{([^}]+)\}`)
+       return re.ReplaceAllStringFunc(input, func(match string) string {
+               envVar := match[2 : len(match)-1]
+               if value, exists := os.LookupEnv(envVar); exists {
+                       return value
+               }
+               return match
+       })
+}
+
+func (c *ConnectorConfig) GetStringField(key string) (string, error) {
+       if val, ok := c.Config[key]; ok {
+               if str, ok := val.(string); ok {
+                       return str, nil
+               }
+               return "", fmt.Errorf("field %s is not a string", key)
+       }
+       return "", fmt.Errorf("field %s not found", key)
+}
+
+func (c *ConnectorConfig) GetIntField(key string) (int, error) {
+       if val, ok := c.Config[key]; ok {
+               if num, ok := val.(int); ok {
+                       return num, nil
+               }
+               return 0, fmt.Errorf("field %s is not an integer", key)
+       }
+       return 0, fmt.Errorf("field %s not found", key)
+}
+
+func (c *ConnectorConfig) GetDurationField(key string) (time.Duration, error) {
+       if val, ok := c.Config[key]; ok {
+               if str, ok := val.(string); ok {
+                       return time.ParseDuration(str)
+               }
+               return 0, fmt.Errorf("field %s is not a string", key)
+       }
+       return 0, fmt.Errorf("field %s not found", key)
+}
+
+func (c *ConnectorConfig) GetNestedConfig(key string) (map[string]interface{}, 
error) {
+       if val, ok := c.Config[key]; ok {
+               if nested, ok := val.(map[string]interface{}); ok {
+                       return nested, nil
+               }
+               return nil, fmt.Errorf("field %s is not a map", key)
+       }
+       return nil, fmt.Errorf("field %s not found", key)
+}
diff --git a/internal/config/config_test.go b/internal/config/config_test.go
new file mode 100644
index 000000000..787fbe078
--- /dev/null
+++ b/internal/config/config_test.go
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package config
+
+import (
+       "os"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestLoadConfig(t *testing.T) {
+       cfg, err := LoadConfig("../../config/custos.yaml")
+       require.NoError(t, err)
+
+       assert.Equal(t, "info", cfg.Core.LogLevel)
+       assert.Equal(t, 8080, cfg.Core.API.Port)
+       assert.NotEmpty(t, cfg.Core.Database.URL)
+
+       assert.NotNil(t, cfg.Connectors)
+       assert.Greater(t, len(cfg.Connectors), 0)
+
+       slurmMapperCfg, ok := cfg.Connectors["slurm-mapper"]
+       assert.True(t, ok)
+       assert.True(t, slurmMapperCfg.Enabled)
+       assert.Equal(t, "slurm-association-mapper", slurmMapperCfg.Type)
+}
+
+func TestExpandEnvVars(t *testing.T) {
+       t.Run("expands environment variables", func(t *testing.T) {
+               os.Setenv("TEST_VAR", "test_value")
+               defer os.Unsetenv("TEST_VAR")
+
+               input := "url: ${TEST_VAR}"
+               output := expandEnvVars(input)
+               assert.Equal(t, "url: test_value", output)
+       })
+
+       t.Run("leaves unexpanded vars when env not set", func(t *testing.T) {
+               os.Unsetenv("UNDEFINED_VAR")
+               input := "url: ${UNDEFINED_VAR}"
+               output := expandEnvVars(input)
+               assert.Equal(t, "url: ${UNDEFINED_VAR}", output)
+       })
+
+       t.Run("expands multiple variables", func(t *testing.T) {
+               os.Setenv("VAR1", "value1")
+               os.Setenv("VAR2", "value2")
+               defer os.Unsetenv("VAR1")
+               defer os.Unsetenv("VAR2")
+
+               input := "url: ${VAR1}\nkey: ${VAR2}"
+               output := expandEnvVars(input)
+               assert.Equal(t, "url: value1\nkey: value2", output)
+       })
+}
+
+func TestConnectorConfigGetters(t *testing.T) {
+       cfg, err := LoadConfig("../../config/custos.yaml")
+       require.NoError(t, err)
+
+       t.Run("verify connector type field", func(t *testing.T) {
+               slurmMapper, ok := cfg.Connectors["slurm-mapper"]
+               require.True(t, ok)
+               assert.Equal(t, "slurm-association-mapper", slurmMapper.Type)
+       })
+
+       t.Run("GetNestedConfig", func(t *testing.T) {
+               slurmMapper, ok := cfg.Connectors["slurm-mapper"]
+               require.True(t, ok)
+               nested, err := slurmMapper.GetNestedConfig("slurm_api")
+               require.NoError(t, err)
+               assert.NotNil(t, nested)
+               assert.Contains(t, nested, "url")
+               assert.Contains(t, nested, "username")
+       })
+
+       t.Run("GetNestedConfig from provisioner", func(t *testing.T) {
+               provisioner, ok := cfg.Connectors["comanage-provisioner"]
+               if ok && provisioner != nil {
+                       nested, err := provisioner.GetNestedConfig("registry")
+                       require.NoError(t, err)
+                       assert.NotNil(t, nested)
+               }
+       })
+}
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index 258a6d924..50e1c4948 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -28,6 +28,7 @@ import (
        
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/pkg/comanage"
        
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper"
        
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/pkg/monitor"
+       "github.com/apache/airavata-custos/internal/config"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
 )
@@ -36,24 +37,24 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB, 
eventBus *events.Bus
        slog.Info("loading connectors")
 
        slog.Info("loading SLURM Association Mapper connector")
-       if err := smapper.LoadConnector(ctx, database, eventBus, coreService, 
wg); err != nil {
+       if err := smapper.LoadConnector(ctx, database, eventBus, coreService, 
wg, nil); err != nil {
                slog.Error("failed to load SLURM Association Mapper connector", 
"error", err)
                return err
        }
 
        slog.Info("loading AMIE connector")
-       if err := amie.LoadConnector(ctx, database, eventBus, coreService, wg); 
err != nil {
+       if err := amie.LoadConnector(ctx, database, eventBus, coreService, wg, 
nil); err != nil {
                slog.Error("failed to load AMIE connector", "error", err)
                return err
        }
        slog.Info("loading COmanage Identity-Provisioner connector")
-       if err := comanage.LoadConnector(ctx, database, eventBus, coreService, 
wg); err != nil {
+       if err := comanage.LoadConnector(ctx, database, eventBus, coreService, 
wg, nil); err != nil {
                slog.Error("failed to load COmanage Identity-Provisioner 
connector", "error", err)
                return err
        }
 
        slog.Info("loading SLURM Usage Monitor connector")
-       if err := monitor.LoadConnector(ctx, database, eventBus, coreService, 
wg); err != nil {
+       if err := monitor.LoadConnector(ctx, database, eventBus, coreService, 
wg, nil); err != nil {
                slog.Error("failed to load SLURM Usage Monitor connector", 
"error", err)
                return err
        }
@@ -61,3 +62,41 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB, 
eventBus *events.Bus
        slog.Info("finished loading connectors")
        return nil
 }
+
+func LoadConnectorsFromConfig(ctx context.Context, cfg *config.Config, 
database *sqlx.DB, eventBus *events.Bus, coreService *service.Service, wg 
*sync.WaitGroup) error {
+       slog.Info("loading connectors from config")
+
+       connectorLoaders := map[string]func(context.Context, *sqlx.DB, 
*events.Bus, *service.Service, *sync.WaitGroup, *config.ConnectorConfig) error{
+               "slurm-association-mapper":     smapper.LoadConnector,
+               "amie-processor":               amie.LoadConnector,
+               "comanage-identity-provisioner": comanage.LoadConnector,
+               "slurm-usage-monitor":          monitor.LoadConnector,
+       }
+
+       for connectorName, connectorCfg := range cfg.Connectors {
+               if connectorCfg == nil {
+                       slog.Debug("connector config is nil", "name", 
connectorName)
+                       continue
+               }
+
+               if !connectorCfg.Enabled {
+                       slog.Info("connector is disabled", "name", 
connectorName, "type", connectorCfg.Type)
+                       continue
+               }
+
+               loader, ok := connectorLoaders[connectorCfg.Type]
+               if !ok {
+                       slog.Warn("unknown connector type", "name", 
connectorName, "type", connectorCfg.Type)
+                       continue
+               }
+
+               slog.Info("loading connector", "name", connectorName, "type", 
connectorCfg.Type)
+               if err := loader(ctx, database, eventBus, coreService, wg, 
connectorCfg); err != nil {
+                       slog.Error("failed to load connector", "name", 
connectorName, "type", connectorCfg.Type, "error", err)
+                       return err
+               }
+       }
+
+       slog.Info("finished loading connectors from config")
+       return nil
+}


Reply via email to