This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch amie-config in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit ff17b6abbd561baca13336bef5da0723e324e6c1 Author: lahiruj <[email protected]> AuthorDate: Sun Jun 14 23:37:31 2026 -0400 Fix global configuration loader and remove redundant AMIE config package --- .gitignore | 3 + CONFIG.md | 19 +-- Makefile | 1 + cmd/server/main.go | 17 +++ config/custos.yaml | 3 +- .../ACCESS/AMIE-Processor/amieclient/client.go | 16 ++- .../AMIE-Processor/amieclient/client_test.go | 3 +- connectors/ACCESS/AMIE-Processor/config/config.go | 137 --------------------- .../AMIE-Processor/pipeline/integration_common.go | 3 +- .../ACCESS/AMIE-Processor/pkg/amie/loader.go | 85 ++++++++----- connectors/ACCESS/AMIE-Processor/worker/poller.go | 4 +- .../worker/poller_integration_test.go | 6 +- .../ACCESS/AMIE-Processor/worker/processor.go | 4 +- .../worker/processor_integration_test.go | 4 +- .../Identity-Provisioner/pkg/comanage/loader.go | 28 +++-- docs/API-Docs.md | 41 ++++-- 16 files changed, 151 insertions(+), 223 deletions(-) diff --git a/.gitignore b/.gitignore index d24da884a..fe0f87872 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,9 @@ com_crashlytics_export_strings.xml node_modules/ dist/ +# Built server binary from `make build`. +/custos + database_data # local application properties diff --git a/CONFIG.md b/CONFIG.md index 5d2b909df..ccd13dd57 100644 --- a/CONFIG.md +++ b/CONFIG.md @@ -10,7 +10,7 @@ Custos server can be configured using a YAML configuration file instead of envir ./custos ``` -The server will automatically load the configuration file. If the config file is not found, it falls back to legacy environment variable configuration. +The server will automatically load the configuration file. If the config file is not found at the resolved path, the server exits with an error — make sure `config/custos.yaml` exists (or override `CONFIG_PATH`). ## Configuration File Location @@ -29,13 +29,13 @@ The `core` section contains essential server settings: ```yaml core: database: - url: "postgresql://user:password@localhost:5432/custos_db?sslmode=disable" + url: "admin:admin@tcp(localhost:3306)/custos?parseTime=true&charset=utf8mb4" api: port: 8080 log_level: "info" ``` -- **database.url**: PostgreSQL connection string (required) +- **database.url**: MariaDB / MySQL DSN (the server uses the `go-sql-driver/mysql` driver). Required. - **api.port**: HTTP API port (default: 8080) - **log_level**: Logging level (info, debug, warn, error) @@ -110,6 +110,7 @@ connectors: poller_enabled: true timeouts: connect_timeout: "5s" + read_timeout: "20s" ``` ## Environment Variable Substitution @@ -130,7 +131,7 @@ core: In your shell: ```bash -export DATABASE_URL="postgresql://user:password@localhost:5432/custos_db" +export DATABASE_URL="admin:admin@tcp(localhost:3306)/custos?parseTime=true&charset=utf8mb4" ./custos ``` @@ -179,14 +180,6 @@ New connector types can be added by: 2. Adding the loader function to `internal/connectors/loader.go` 3. Mapping the type to the loader in `connectorLoaders` map -## Fallback to Environment Variables - -For backward compatibility, if the configuration file is not found, the server falls back to the legacy environment variable configuration: - -```bash -DATABASE_DSN="..." HTTP_ADDR=":8080" ./custos -``` - ## Configuration Validation The configuration is validated during parsing. Common issues: @@ -227,7 +220,7 @@ To add a new connector type: 3. Add the type-to-loader mapping in the `connectorLoaders` map: ```go -connectorLoaders := map[string]func(context.Context, *sqlx.DB, *events.Bus, *service.Service, *sync.WaitGroup) error{ +connectorLoaders := map[string]func(context.Context, *sqlx.DB, *events.Bus, *service.Service, *sync.WaitGroup, *http.ServeMux, *config.ConnectorConfig) error{ "my-new-connector": mynewconnector.LoadConnector, // ... existing connectors } diff --git a/Makefile b/Makefile index 8eb569f9c..0fbcb7e6f 100644 --- a/Makefile +++ b/Makefile @@ -16,6 +16,7 @@ gen-api: go generate ./... build: gen-api + go build -o custos ./cmd/server go build ./... verify-no-drift: gen-api diff --git a/cmd/server/main.go b/cmd/server/main.go index b708e8d7c..7d75f2286 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -29,6 +29,7 @@ import ( "os" "os/signal" "strconv" + "strings" "sync" "syscall" "time" @@ -68,6 +69,7 @@ func run() error { return errors.New("failed to load config: " + err.Error()) } + applyLogLevel(cfg.Core.LogLevel) slog.Info("loaded config", "path", configPath) dsn := cfg.Core.Database.URL @@ -212,3 +214,18 @@ func envInt(key string, fallback int) int { } return n } + +func applyLogLevel(level string) { + var lvl slog.Level + switch strings.ToLower(strings.TrimSpace(level)) { + case "debug": + lvl = slog.LevelDebug + case "warn", "warning": + lvl = slog.LevelWarn + case "error": + lvl = slog.LevelError + default: + lvl = slog.LevelInfo + } + slog.SetDefault(slog.New(tracing.SlogHandler(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: lvl})))) +} diff --git a/config/custos.yaml b/config/custos.yaml index 21b67f903..11508d05e 100644 --- a/config/custos.yaml +++ b/config/custos.yaml @@ -1,6 +1,6 @@ core: database: - url: "postgresql://custos_user:password@localhost:5432/custos_db?sslmode=disable" + url: "admin:admin@tcp(localhost:3306)/custos?parseTime=true&charset=utf8mb4" api: port: 8080 log_level: "info" @@ -57,3 +57,4 @@ connectors: poller_enabled: true timeouts: connect_timeout: "5s" + read_timeout: "20s" diff --git a/connectors/ACCESS/AMIE-Processor/amieclient/client.go b/connectors/ACCESS/AMIE-Processor/amieclient/client.go index d8378e46f..bd8734a3e 100644 --- a/connectors/ACCESS/AMIE-Processor/amieclient/client.go +++ b/connectors/ACCESS/AMIE-Processor/amieclient/client.go @@ -26,10 +26,20 @@ import ( "log/slog" "net/http" "strings" - - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" + "time" ) +type Config struct { + BaseURL string + SiteCode string + APIKey string + PollInterval time.Duration + WorkerInterval time.Duration + ConnectTimeout time.Duration + ReadTimeout time.Duration + PollerEnabled bool +} + type Client struct { httpClient *http.Client baseURL string @@ -37,7 +47,7 @@ type Client struct { apiKey string } -func New(cfg config.AMIEConfig) *Client { +func New(cfg Config) *Client { return &Client{ httpClient: &http.Client{ Timeout: cfg.ReadTimeout, diff --git a/connectors/ACCESS/AMIE-Processor/amieclient/client_test.go b/connectors/ACCESS/AMIE-Processor/amieclient/client_test.go index 7af477613..69ad602f8 100644 --- a/connectors/ACCESS/AMIE-Processor/amieclient/client_test.go +++ b/connectors/ACCESS/AMIE-Processor/amieclient/client_test.go @@ -26,14 +26,13 @@ import ( "time" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // newTestClient builds a Client pointed at the given httptest.Server URL. func newTestClient(serverURL string) *amieclient.Client { - return amieclient.New(config.AMIEConfig{ + return amieclient.New(amieclient.Config{ BaseURL: serverURL, SiteCode: "TESTSITE", APIKey: "test-key", diff --git a/connectors/ACCESS/AMIE-Processor/config/config.go b/connectors/ACCESS/AMIE-Processor/config/config.go deleted file mode 100644 index a365aae1e..000000000 --- a/connectors/ACCESS/AMIE-Processor/config/config.go +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package config - -import ( - "fmt" - "os" - "strconv" - "time" - - "gopkg.in/yaml.v3" -) - -type Config struct { - Server ServerConfig `yaml:"server"` - Database DatabaseConfig `yaml:"database"` - AMIE AMIEConfig `yaml:"amie"` - Log LogConfig `yaml:"log"` -} - -type ServerConfig struct { - Port int `yaml:"port"` -} - -type DatabaseConfig struct { - DSN string `yaml:"dsn"` - MaxOpenConns int `yaml:"max_open_conns"` - MaxIdleConns int `yaml:"max_idle_conns"` -} - -type AMIEConfig struct { - BaseURL string `yaml:"base_url"` - SiteCode string `yaml:"site_code"` - APIKey string `yaml:"api_key"` - PollInterval time.Duration `yaml:"poll_interval"` - WorkerInterval time.Duration `yaml:"worker_interval"` - ConnectTimeout time.Duration `yaml:"connect_timeout"` - ReadTimeout time.Duration `yaml:"read_timeout"` - PollerEnabled bool `yaml:"poller_enabled"` -} - -type LogConfig struct { - Level string `yaml:"level"` - Format string `yaml:"format"` // "text" or "json" -} - -// Load reads config from a YAML file and applies environment variable overrides. -func Load(path string) (*Config, error) { - data, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("read config file: %w", err) - } - - var cfg Config - if err := yaml.Unmarshal(data, &cfg); err != nil { - return nil, fmt.Errorf("parse config file: %w", err) - } - - applyDefaults(&cfg) - applyEnvOverrides(&cfg) - - return &cfg, nil -} - -func applyDefaults(cfg *Config) { - if cfg.Server.Port == 0 { - cfg.Server.Port = 8083 - } - if cfg.Database.MaxOpenConns == 0 { - cfg.Database.MaxOpenConns = 25 - } - if cfg.Database.MaxIdleConns == 0 { - cfg.Database.MaxIdleConns = 5 - } - if cfg.AMIE.PollInterval == 0 { - cfg.AMIE.PollInterval = 30 * time.Second - } - if cfg.AMIE.WorkerInterval == 0 { - cfg.AMIE.WorkerInterval = 5 * time.Second - } - if cfg.AMIE.ConnectTimeout == 0 { - cfg.AMIE.ConnectTimeout = 5 * time.Second - } - if cfg.AMIE.ReadTimeout == 0 { - cfg.AMIE.ReadTimeout = 20 * time.Second - } - if !cfg.AMIE.PollerEnabled { - cfg.AMIE.PollerEnabled = true - } - if cfg.Log.Level == "" { - cfg.Log.Level = "info" - } - if cfg.Log.Format == "" { - cfg.Log.Format = "text" - } -} - -func applyEnvOverrides(cfg *Config) { - if v := os.Getenv("AMIE_SITE_CODE"); v != "" { - cfg.AMIE.SiteCode = v - } - if v := os.Getenv("AMIE_API_KEY"); v != "" { - cfg.AMIE.APIKey = v - } - if v := os.Getenv("AMIE_BASE_URL"); v != "" { - cfg.AMIE.BaseURL = v - } - if v := os.Getenv("DATABASE_DSN"); v != "" { - cfg.Database.DSN = v - } - if v := os.Getenv("SERVER_PORT"); v != "" { - if port, err := strconv.Atoi(v); err == nil { - cfg.Server.Port = port - } - } - if v := os.Getenv("LOG_LEVEL"); v != "" { - cfg.Log.Level = v - } - if v := os.Getenv("LOG_FORMAT"); v != "" { - cfg.Log.Format = v - } -} diff --git a/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go b/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go index 2e2e41fa0..f9a3780fa 100644 --- a/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go +++ b/connectors/ACCESS/AMIE-Processor/pipeline/integration_common.go @@ -35,7 +35,6 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" amiedb "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/db" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/handler" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/metrics" @@ -182,7 +181,7 @@ func newTestPipeline(t *testing.T) *testPipeline { t.Helper() database := setupTestDB(t) - cfg := config.AMIEConfig{ + cfg := amieclient.Config{ BaseURL: os.Getenv("AMIE_BASE_URL"), SiteCode: os.Getenv("AMIE_SITE_CODE"), APIKey: os.Getenv("AMIE_API_KEY"), diff --git a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go index 92a1f9b34..3ddddf8b7 100644 --- a/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go +++ b/connectors/ACCESS/AMIE-Processor/pkg/amie/loader.go @@ -33,7 +33,6 @@ import ( "github.com/jmoiron/sqlx" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" amiedb "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/db" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/handler" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/metrics" @@ -69,7 +68,7 @@ const connectorName = "amie" // @name X-Custos-User-Id func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, coreService *coreservice.Service, wg *sync.WaitGroup, mux *http.ServeMux, connectorConfig *custosconfig.ConnectorConfig) error { cfg := loadConfig(connectorConfig) - if cfg.AMIE.APIKey == "" || cfg.AMIE.BaseURL == "" || cfg.AMIE.SiteCode == "" { + if cfg.APIKey == "" || cfg.BaseURL == "" || cfg.SiteCode == "" { slog.Warn("AMIE credentials not fully provided, skipping AMIE connector") return nil } @@ -92,12 +91,12 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, // One AMIE site is tied to one downstream cluster by protocol, so cluster // identity is per-deployment configuration rather than per-packet. - clusterID := os.Getenv("AMIE_CLUSTER_ID") + clusterID := loadClusterID(connectorConfig) if clusterID == "" { - slog.Warn("AMIE_CLUSTER_ID not set; account and project create handlers will fail") + slog.Warn("AMIE cluster id not set (cluster.id in YAML or AMIE_CLUSTER_ID env); account and project create handlers will fail") } - amie := amieclient.New(cfg.AMIE) + amie := amieclient.New(cfg) router := handler.NewRouter( handler.NewRequestProjectCreateHandler(coreService, clusterID, amie, auditSvc), @@ -115,8 +114,8 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, ) met := metrics.New() - poller := worker.NewPoller(amie, packetStore, eventStore, met, database, cfg.AMIE) - processor := worker.NewProcessor(eventStore, packetStore, errorStore, router, met, auditSvc, database, cfg.AMIE) + poller := worker.NewPoller(amie, packetStore, eventStore, met, database, cfg) + processor := worker.NewProcessor(eventStore, packetStore, errorStore, router, met, auditSvc, database, cfg) wg.Add(2) go func() { @@ -130,79 +129,97 @@ func LoadConnector(ctx context.Context, database *sqlx.DB, eventBus *events.Bus, slog.Info("AMIE processor stopped") }() - slog.Info("AMIE connector started", "site", cfg.AMIE.SiteCode, "baseURL", cfg.AMIE.BaseURL) + slog.Info("AMIE connector started", "site", cfg.SiteCode, "baseURL", cfg.BaseURL) return nil } -func loadConfig(connectorConfig *custosconfig.ConnectorConfig) *config.Config { - cfg := &config.Config{} +func loadConfig(connectorConfig *custosconfig.ConnectorConfig) amieclient.Config { + cfg := amieclient.Config{} + pollerEnabledSet := false - // Load from connector config if available if connectorConfig != nil { if credentials, err := connectorConfig.GetNestedConfig("credentials"); err == nil { if url, ok := credentials["base_url"].(string); ok { - cfg.AMIE.BaseURL = url + cfg.BaseURL = url } if code, ok := credentials["site_code"].(string); ok { - cfg.AMIE.SiteCode = code + cfg.SiteCode = code } if key, ok := credentials["api_key"].(string); ok { - cfg.AMIE.APIKey = key + cfg.APIKey = key } } if polling, err := connectorConfig.GetNestedConfig("polling"); err == nil { if interval, ok := polling["poll_interval"].(string); ok { if d, err := time.ParseDuration(interval); err == nil { - cfg.AMIE.PollInterval = d + cfg.PollInterval = d } } if interval, ok := polling["worker_interval"].(string); ok { if d, err := time.ParseDuration(interval); err == nil { - cfg.AMIE.WorkerInterval = d + cfg.WorkerInterval = d } } if enabled, ok := polling["poller_enabled"].(bool); ok { - cfg.AMIE.PollerEnabled = enabled + cfg.PollerEnabled = enabled + pollerEnabledSet = true } } if timeouts, err := connectorConfig.GetNestedConfig("timeouts"); err == nil { if timeout, ok := timeouts["connect_timeout"].(string); ok { if d, err := time.ParseDuration(timeout); err == nil { - cfg.AMIE.ConnectTimeout = d + cfg.ConnectTimeout = d + } + } + if timeout, ok := timeouts["read_timeout"].(string); ok { + if d, err := time.ParseDuration(timeout); err == nil { + cfg.ReadTimeout = d } } } } - // Fall back to environment variables - if cfg.AMIE.BaseURL == "" { - cfg.AMIE.BaseURL = os.Getenv("AMIE_BASE_URL") + if cfg.BaseURL == "" { + cfg.BaseURL = os.Getenv("AMIE_BASE_URL") } - if cfg.AMIE.SiteCode == "" { - cfg.AMIE.SiteCode = os.Getenv("AMIE_SITE_CODE") + if cfg.SiteCode == "" { + cfg.SiteCode = os.Getenv("AMIE_SITE_CODE") } - if cfg.AMIE.APIKey == "" { - cfg.AMIE.APIKey = os.Getenv("AMIE_API_KEY") + if cfg.APIKey == "" { + cfg.APIKey = os.Getenv("AMIE_API_KEY") } - if cfg.AMIE.PollInterval == 0 { - cfg.AMIE.PollInterval = durationEnv("AMIE_POLL_INTERVAL", 30*time.Second) + if cfg.PollInterval == 0 { + cfg.PollInterval = durationEnv("AMIE_POLL_INTERVAL", 30*time.Second) } - if cfg.AMIE.WorkerInterval == 0 { - cfg.AMIE.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL", 5*time.Second) + if cfg.WorkerInterval == 0 { + cfg.WorkerInterval = durationEnv("AMIE_WORKER_INTERVAL", 5*time.Second) } - if cfg.AMIE.ConnectTimeout == 0 { - cfg.AMIE.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT", 5*time.Second) + if cfg.ConnectTimeout == 0 { + cfg.ConnectTimeout = durationEnv("AMIE_CONNECT_TIMEOUT", 5*time.Second) } - if cfg.AMIE.ReadTimeout == 0 { - cfg.AMIE.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT", 20*time.Second) + if cfg.ReadTimeout == 0 { + cfg.ReadTimeout = durationEnv("AMIE_READ_TIMEOUT", 20*time.Second) + } + if !pollerEnabledSet { + cfg.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED", true) } - cfg.AMIE.PollerEnabled = boolEnv("AMIE_POLLER_ENABLED", cfg.AMIE.PollerEnabled) return cfg } +func loadClusterID(connectorConfig *custosconfig.ConnectorConfig) string { + if connectorConfig != nil { + if cluster, err := connectorConfig.GetNestedConfig("cluster"); err == nil { + if id, ok := cluster["id"].(string); ok && id != "" { + return id + } + } + } + return os.Getenv("AMIE_CLUSTER_ID") +} + func durationEnv(key string, fallback time.Duration) time.Duration { v := os.Getenv(key) if v == "" { diff --git a/connectors/ACCESS/AMIE-Processor/worker/poller.go b/connectors/ACCESS/AMIE-Processor/worker/poller.go index 0ddacbd51..40a712ade 100644 --- a/connectors/ACCESS/AMIE-Processor/worker/poller.go +++ b/connectors/ACCESS/AMIE-Processor/worker/poller.go @@ -28,7 +28,7 @@ import ( "github.com/google/uuid" "github.com/jmoiron/sqlx" - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" custosdb "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/db" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" ) @@ -61,7 +61,7 @@ type Poller struct { enabled bool } -func NewPoller(client pollerAmieClient, packetStore pollerPacketStore, eventStore pollerEventStore, metrics pollerMetrics, db *sqlx.DB, cfg config.AMIEConfig) *Poller { +func NewPoller(client pollerAmieClient, packetStore pollerPacketStore, eventStore pollerEventStore, metrics pollerMetrics, db *sqlx.DB, cfg amieclient.Config) *Poller { return &Poller{ client: client, packetStore: packetStore, diff --git a/connectors/ACCESS/AMIE-Processor/worker/poller_integration_test.go b/connectors/ACCESS/AMIE-Processor/worker/poller_integration_test.go index 3ef7a8a5a..049169e69 100644 --- a/connectors/ACCESS/AMIE-Processor/worker/poller_integration_test.go +++ b/connectors/ACCESS/AMIE-Processor/worker/poller_integration_test.go @@ -24,7 +24,7 @@ import ( "testing" "time" - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" ) @@ -44,7 +44,7 @@ func newPoller(t *testing.T, client pollerAmieClient) (*Poller, *stubMetrics) { t.Helper() database := setupTestDB(t) met := &stubMetrics{} - cfg := config.AMIEConfig{ + cfg := amieclient.Config{ PollInterval: 10 * time.Millisecond, PollerEnabled: true, } @@ -186,7 +186,7 @@ func TestPoller_DisabledRunReturnsImmediately(t *testing.T) { database := setupTestDB(t) stub := &stubAmieClient{responses: [][]map[string]any{{makePollerPacket(5001, "request_project_create")}}} met := &stubMetrics{} - cfg := config.AMIEConfig{ + cfg := amieclient.Config{ PollInterval: 10 * time.Millisecond, PollerEnabled: false, } diff --git a/connectors/ACCESS/AMIE-Processor/worker/processor.go b/connectors/ACCESS/AMIE-Processor/worker/processor.go index 11ee0e6cf..cce7c030b 100644 --- a/connectors/ACCESS/AMIE-Processor/worker/processor.go +++ b/connectors/ACCESS/AMIE-Processor/worker/processor.go @@ -29,7 +29,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" custosdb "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/db" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" "github.com/apache/airavata-custos/internal/tracing" @@ -90,7 +90,7 @@ type Processor struct { workerInterval time.Duration } -func NewProcessor(eventStore processorEventStore, packetStore processorPacketStore, errorStore processorErrorStore, router processorRouter, metrics processorMetrics, auditSvc processorAuditService, db *sqlx.DB, cfg config.AMIEConfig) *Processor { +func NewProcessor(eventStore processorEventStore, packetStore processorPacketStore, errorStore processorErrorStore, router processorRouter, metrics processorMetrics, auditSvc processorAuditService, db *sqlx.DB, cfg amieclient.Config) *Processor { return &Processor{ eventStore: eventStore, packetStore: packetStore, diff --git a/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go b/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go index 2978f52d3..0b454f7a7 100644 --- a/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go +++ b/connectors/ACCESS/AMIE-Processor/worker/processor_integration_test.go @@ -29,7 +29,7 @@ import ( "github.com/google/uuid" "github.com/jmoiron/sqlx" - "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/config" + "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/amieclient" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/model" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/service" "github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/store" @@ -85,7 +85,7 @@ func seedNewEvent(t *testing.T, database *sqlx.DB) (packetID, eventID string) { } func newProcessor(database *sqlx.DB, router processorRouter, met *stubMetrics) *Processor { - cfg := config.AMIEConfig{WorkerInterval: 50 * time.Millisecond} + cfg := amieclient.Config{WorkerInterval: 50 * time.Millisecond} return NewProcessor( store.NewEventStore(database), store.NewPacketStore(database), diff --git a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go index 390183dbe..d2d312f9c 100644 --- a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go +++ b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go @@ -42,10 +42,11 @@ func init() { tracing.RegisterTerminalMarkers("comanage", "ComanageClusterAccountAttached") } -// LoadConnector wires the subscriber to the event bus. If any required env -// var is missing, the loader logs and returns nil without registering. +// LoadConnector wires the subscriber to the event bus. Reads YAML config first +// and falls back to environment variables. If neither yields a complete +// config, it logs and returns nil without registering. func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, coreService *service.Service, _ *sync.WaitGroup, _ *http.ServeMux, connectorConfig *config.ConnectorConfig) error { - cfg, ok := loadConfigFromEnv() + cfg, ok := loadConfigFromConnectorConfig(connectorConfig) if !ok { cfg, ok = loadConfigFromEnv() if !ok { @@ -84,16 +85,12 @@ func loadConfigFromConnectorConfig(connectorConfig *config.ConnectorConfig) (cli if k, ok := registry["api_key"].(string); ok { apiKey = k } - if id, ok := registry["co_id"].(float64); ok { - coID = int(id) - } + coID = asInt(registry["co_id"]) } // Load unix_cluster config if unixCluster, err := connectorConfig.GetNestedConfig("unix_cluster"); err == nil { - if id, ok := unixCluster["id"].(float64); ok { - unixClusterID = int(id) - } + unixClusterID = asInt(unixCluster["id"]) if pType, ok := unixCluster["person_id_type"].(string); ok { personIDType = pType } @@ -142,6 +139,19 @@ func loadConfigFromConnectorConfig(connectorConfig *config.ConnectorConfig) (cli }, true } +// asInt handles the int / int64 / float64 shapes yaml.v3 may produce. +func asInt(v interface{}) int { + switch n := v.(type) { + case int: + return n + case int64: + return int(n) + case float64: + return int(n) + } + return 0 +} + func loadConfigFromEnv() (client.Config, bool) { registryURL := os.Getenv("COMANAGE_REGISTRY_URL") coIDStr := os.Getenv("COMANAGE_CO_ID") diff --git a/docs/API-Docs.md b/docs/API-Docs.md index 3d9ded43c..cae52f384 100644 --- a/docs/API-Docs.md +++ b/docs/API-Docs.md @@ -3,7 +3,7 @@ HTTP/JSON API exposed by `cmd/server`. All endpoints accept and return `application/json` and use UTF-8. -- **Base URL:** `http://<host>:<port>` (default port `8080`, configurable via `HTTP_ADDR`) +- **Base URL:** `http://<host>:<port>` (default port `8080`, configurable via `core.api.port` in `config/custos.yaml`) - **Auth:** none currently enforced (deploy behind a trusted ingress / auth proxy) - **Content-Type:** `application/json` is required on every request that has a body - **Unknown fields:** request bodies with unknown JSON fields are rejected with `400` @@ -1415,22 +1415,37 @@ curl -s $BASE/projects/$PROJ_ID | jq ## Running the server -```bash -export DATABASE_DSN='custos:secret@tcp(127.0.0.1:3306)/custos?parseTime=true&charset=utf8mb4' -# optional -export HTTP_ADDR=:8080 -export DB_MAX_OPEN_CONNS=25 -export DB_MAX_IDLE_CONNS=5 +The server is configured by a YAML file (default `config/custos.yaml`, override +with `CONFIG_PATH`). See [CONFIG.md](../CONFIG.md) for the full schema. +```bash go run ./cmd/server ``` -| Environment variable | Default | Purpose | -|----------------------|---------|---------| -| `DATABASE_DSN` | *(required)* | MySQL/MariaDB DSN. `parseTime=true` is mandatory. | -| `HTTP_ADDR` | `:8080` | Address the HTTP server binds to. | -| `DB_MAX_OPEN_CONNS` | `25` | Maximum open database connections. | -| `DB_MAX_IDLE_CONNS` | `5` | Maximum idle database connections. | +A minimal `config/custos.yaml`: + +```yaml +core: + database: + url: "custos:secret@tcp(127.0.0.1:3306)/custos?parseTime=true&charset=utf8mb4" + api: + port: 8080 + log_level: "info" + +connectors: {} +``` + +Secrets can stay out of the file via `${VAR}` substitution — e.g. +`url: "${CUSTOS_DB_DSN}"` reads the DSN from the environment at startup. + +| Setting | Source | Default | Purpose | +|---------|--------|---------|---------| +| `core.database.url` | YAML | *(required)* | MariaDB / MySQL DSN. `parseTime=true` is mandatory. | +| `core.api.port` | YAML | `8080` | HTTP API port. | +| `core.log_level` | YAML | `info` | One of `debug`, `info`, `warn`, `error`. | +| `DB_MAX_OPEN_CONNS` | env | `25` | Maximum open database connections. | +| `DB_MAX_IDLE_CONNS` | env | `5` | Maximum idle database connections. | +| `CONFIG_PATH` | env | `config/custos.yaml` | Override the config file location. | Migrations from `internal/db/migrations/` are applied automatically on startup.
