This is an automated email from the ASF dual-hosted git repository.
DImuthuUpe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
The following commit(s) were added to refs/heads/master by this push:
new ec14c2eca [WIP] SLURM usage monitor connector (#485)
ec14c2eca is described below
commit ec14c2ecae68ea4a6d3fe3e70a38cf0ea1935e34
Author: Dimuthu Wannipurage <[email protected]>
AuthorDate: Fri Jun 12 00:03:23 2026 -0400
[WIP] SLURM usage monitor connector (#485)
* Moved slurm client to higher level module
* Adding slurm monitor and fetching job list api
* Refactoring allocation resource to have a direct link to compute cluster
* smonitor integration tests wip
* Validating job with the custos accounting details
* Fixing migration script naming issues
* Removing compute allocation fk constraint
* Calculating raw SU amount
---
connectors/SLURM/Association-Mapper/README.md | 8 +-
.../internal/subscribers/account.go | 2 +-
.../subscribers/accountsub_integration_test.go | 2 +-
.../internal/subscribers/members.go | 2 +-
.../subscribers/members_integration_test.go | 2 +-
.../internal/subscribers/subscriber.go | 2 +-
.../SLURM/Association-Mapper/pkg/smapper/loader.go | 2 +-
.../pkg/client}/accounts.go | 2 +-
.../pkg/client}/accounts_integration_test.go | 2 +-
.../pkg/client}/accounts_test.go | 2 +-
.../pkg/client}/associations.go | 2 +-
.../pkg/client}/associations_integration_test.go | 2 +-
.../pkg/client}/associations_test.go | 2 +-
.../pkg/client}/client.go | 5 +-
.../pkg/client}/client_test.go | 2 +-
.../pkg/client}/integration_common.go | 2 +-
connectors/SLURM/Rest-Client/pkg/client/jobs.go | 74 ++++
.../pkg/client/jobs_integration_test.go | 82 ++++
.../operations => Rest-Client/pkg/client}/tres.go | 2 +-
.../pkg/client}/tres_test.go | 2 +-
.../operations => Rest-Client/pkg/client}/types.go | 74 +++-
.../Usage-Monitor/internal/smonitor/smonitor.go | 165 +++++++
.../internal/smonitor/smonitor_integration_test.go | 265 ++++++++++++
.../SLURM/Usage-Monitor/pkg/monitor/loader.go | 42 ++
dev-ops/local-slurm/Makefile | 12 +
dev-ops/local-slurm/README.md | 3 +
.../local-slurm/scripts/bootstrap-accounting.sh | 6 +
internal/connectors/loader.go | 8 +-
internal/db/migrations/000004_allocations.up.sql | 4 +-
...6_compute_allocation_resource_cluster.down.sql} | 6 +-
...006_compute_allocation_resource_cluster.up.sql} | 7 +-
...es.down.sql => 000007_user_privileges.down.sql} | 0
...ileges.up.sql => 000007_user_privileges.up.sql} | 0
...000007_roles.down.sql => 000008_roles.down.sql} | 0
.../{000007_roles.up.sql => 000008_roles.up.sql} | 0
...00009_update_allocation_usage_columns.down.sql} | 5 +-
... 000009_update_allocation_usage_columns.up.sql} | 5 +-
.../compute_allocation_resource_mapping_store.go | 2 +-
.../store/compute_allocation_resource_store.go | 42 +-
internal/store/compute_allocation_usage_store.go | 13 +
internal/store/compute_cluster_user_store.go | 15 +
internal/store/store.go | 11 +
pkg/models/allocation.go | 13 +-
pkg/service/compute_allocation_resource.go | 44 ++
pkg/service/compute_allocation_usage.go | 19 +
pkg/service/compute_cluster_user.go | 19 +
pkg/service/interface.go | 4 +
pkg/service/mock.go | 476 +++++++++++++++------
...-tests.sh => slurm-mapper-integration-tests.sh} | 26 +-
scripts/slurm-monitor-integration-tests.sh | 81 ++++
50 files changed, 1379 insertions(+), 189 deletions(-)
diff --git a/connectors/SLURM/Association-Mapper/README.md
b/connectors/SLURM/Association-Mapper/README.md
index 2dff57ba3..ff32c411f 100644
--- a/connectors/SLURM/Association-Mapper/README.md
+++ b/connectors/SLURM/Association-Mapper/README.md
@@ -13,11 +13,13 @@ This package is part of the root
`github.com/apache/airavata-custos` module.
```
.
-├── internal/operations/ # slurmrestd client + accounts/associations/TRES
-└── pkg/operations/
+├── internal/subscribers/ # event subscribers wiring core events to
slurmrestd
+└── pkg/smapper/ # connector loader
```
-Import path:
`github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations`.
+The slurmrestd client lives in the sibling Rest-Client module at
`connectors/SLURM/Rest-Client/pkg/client`.
+
+Import path:
`github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client`.
## Configuration
diff --git
a/connectors/SLURM/Association-Mapper/internal/subscribers/account.go
b/connectors/SLURM/Association-Mapper/internal/subscribers/account.go
index fb7f69f57..4e132aa8c 100644
--- a/connectors/SLURM/Association-Mapper/internal/subscribers/account.go
+++ b/connectors/SLURM/Association-Mapper/internal/subscribers/account.go
@@ -6,7 +6,7 @@ import (
"context"
"time"
- client
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations"
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/pkg/models"
)
diff --git
a/connectors/SLURM/Association-Mapper/internal/subscribers/accountsub_integration_test.go
b/connectors/SLURM/Association-Mapper/internal/subscribers/accountsub_integration_test.go
index 867f5ff29..4c701f456 100644
---
a/connectors/SLURM/Association-Mapper/internal/subscribers/accountsub_integration_test.go
+++
b/connectors/SLURM/Association-Mapper/internal/subscribers/accountsub_integration_test.go
@@ -3,7 +3,7 @@ package subscribers
import (
"context"
"fmt"
- operations
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations"
+ operations
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/pkg/models"
"github.com/apache/airavata-custos/pkg/service"
"os"
diff --git
a/connectors/SLURM/Association-Mapper/internal/subscribers/members.go
b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go
index 1d8b294fd..e3fa20e36 100644
--- a/connectors/SLURM/Association-Mapper/internal/subscribers/members.go
+++ b/connectors/SLURM/Association-Mapper/internal/subscribers/members.go
@@ -6,7 +6,7 @@ import (
"context"
"time"
- client
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations"
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/pkg/models"
)
diff --git
a/connectors/SLURM/Association-Mapper/internal/subscribers/members_integration_test.go
b/connectors/SLURM/Association-Mapper/internal/subscribers/members_integration_test.go
index 3bd90837d..b90a6152f 100644
---
a/connectors/SLURM/Association-Mapper/internal/subscribers/members_integration_test.go
+++
b/connectors/SLURM/Association-Mapper/internal/subscribers/members_integration_test.go
@@ -2,7 +2,7 @@ package subscribers
import (
"context"
-
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations"
+ operations
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/pkg/models"
"github.com/apache/airavata-custos/pkg/service"
"os"
diff --git
a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go
b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go
index 3a2fa5161..71070714f 100644
--- a/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go
+++ b/connectors/SLURM/Association-Mapper/internal/subscribers/subscriber.go
@@ -4,7 +4,7 @@ import (
"context"
"time"
- client
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations"
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/models"
"github.com/apache/airavata-custos/pkg/service"
diff --git a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
index 9eee2ae74..66aba8e6a 100644
--- a/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
+++ b/connectors/SLURM/Association-Mapper/pkg/smapper/loader.go
@@ -8,8 +8,8 @@ import (
"github.com/jmoiron/sqlx"
- client
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/operations"
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/internal/subscribers"
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
)
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/accounts.go
b/connectors/SLURM/Rest-Client/pkg/client/accounts.go
similarity index 98%
rename from connectors/SLURM/Association-Mapper/internal/operations/accounts.go
rename to connectors/SLURM/Rest-Client/pkg/client/accounts.go
index 18cc0e456..c58a26fc1 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/accounts.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/accounts.go
@@ -1,5 +1,5 @@
// cli/internal/client/accounts.go
-package operations
+package client
import "fmt"
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/accounts_integration_test.go
b/connectors/SLURM/Rest-Client/pkg/client/accounts_integration_test.go
similarity index 99%
rename from
connectors/SLURM/Association-Mapper/internal/operations/accounts_integration_test.go
rename to connectors/SLURM/Rest-Client/pkg/client/accounts_integration_test.go
index 5a1a41df8..e20228bcb 100644
---
a/connectors/SLURM/Association-Mapper/internal/operations/accounts_integration_test.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/accounts_integration_test.go
@@ -1,4 +1,4 @@
-package operations
+package client
import (
"os"
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/accounts_test.go
b/connectors/SLURM/Rest-Client/pkg/client/accounts_test.go
similarity index 99%
rename from
connectors/SLURM/Association-Mapper/internal/operations/accounts_test.go
rename to connectors/SLURM/Rest-Client/pkg/client/accounts_test.go
index aaeae99df..bdf5c4f5c 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/accounts_test.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/accounts_test.go
@@ -1,5 +1,5 @@
// cli/internal/client/accounts_test.go
-package operations
+package client
import (
"encoding/json"
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/associations.go
b/connectors/SLURM/Rest-Client/pkg/client/associations.go
similarity index 99%
rename from
connectors/SLURM/Association-Mapper/internal/operations/associations.go
rename to connectors/SLURM/Rest-Client/pkg/client/associations.go
index 1ecf66033..6579944d9 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/associations.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/associations.go
@@ -1,5 +1,5 @@
// cli/internal/client/associations.go
-package operations
+package client
import (
"errors"
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/associations_integration_test.go
b/connectors/SLURM/Rest-Client/pkg/client/associations_integration_test.go
similarity index 99%
rename from
connectors/SLURM/Association-Mapper/internal/operations/associations_integration_test.go
rename to
connectors/SLURM/Rest-Client/pkg/client/associations_integration_test.go
index bfe296297..7ec08e88d 100644
---
a/connectors/SLURM/Association-Mapper/internal/operations/associations_integration_test.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/associations_integration_test.go
@@ -1,4 +1,4 @@
-package operations
+package client
import (
"os"
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/associations_test.go
b/connectors/SLURM/Rest-Client/pkg/client/associations_test.go
similarity index 99%
rename from
connectors/SLURM/Association-Mapper/internal/operations/associations_test.go
rename to connectors/SLURM/Rest-Client/pkg/client/associations_test.go
index 9f394eed4..dac380c88 100644
---
a/connectors/SLURM/Association-Mapper/internal/operations/associations_test.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/associations_test.go
@@ -1,5 +1,5 @@
// cli/internal/client/associations_test.go
-package operations
+package client
import (
"encoding/json"
diff --git a/connectors/SLURM/Association-Mapper/internal/operations/client.go
b/connectors/SLURM/Rest-Client/pkg/client/client.go
similarity index 96%
rename from connectors/SLURM/Association-Mapper/internal/operations/client.go
rename to connectors/SLURM/Rest-Client/pkg/client/client.go
index a60ab883a..85eacf017 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/client.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/client.go
@@ -1,11 +1,12 @@
// cli/internal/client/client.go
-package operations
+package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
+ "log"
"net/http"
"strings"
"time"
@@ -37,6 +38,8 @@ func (c *Client) do(method, path string, body any, out any)
(*http.Response, err
return nil, err
}
reqBody = bytes.NewReader(buf)
+
+ log.Printf("Request body: %s", string(buf))
}
req, err := http.NewRequest(method, c.baseURL+path, reqBody)
if err != nil {
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/client_test.go
b/connectors/SLURM/Rest-Client/pkg/client/client_test.go
similarity index 98%
rename from
connectors/SLURM/Association-Mapper/internal/operations/client_test.go
rename to connectors/SLURM/Rest-Client/pkg/client/client_test.go
index c73d3ecb5..22f4b3fbe 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/client_test.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/client_test.go
@@ -1,5 +1,5 @@
// cli/internal/client/client_test.go
-package operations
+package client
import (
"net/http"
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/integration_common.go
b/connectors/SLURM/Rest-Client/pkg/client/integration_common.go
similarity index 97%
rename from
connectors/SLURM/Association-Mapper/internal/operations/integration_common.go
rename to connectors/SLURM/Rest-Client/pkg/client/integration_common.go
index 6bcd8bc96..ec799ddb6 100644
---
a/connectors/SLURM/Association-Mapper/internal/operations/integration_common.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/integration_common.go
@@ -1,4 +1,4 @@
-package operations
+package client
import "os"
import "testing"
diff --git a/connectors/SLURM/Rest-Client/pkg/client/jobs.go
b/connectors/SLURM/Rest-Client/pkg/client/jobs.go
new file mode 100644
index 000000000..d29a041d3
--- /dev/null
+++ b/connectors/SLURM/Rest-Client/pkg/client/jobs.go
@@ -0,0 +1,74 @@
+package client
+
+/*
+curl -s -X GET \
+ "http://localhost:6820/slurmdb/v0.0.41/jobs" \
+ -H "X-SLURM-USER-NAME: root" \
+ -H "X-SLURM-USER-TOKEN: $SLURM_JWT" \
+ -H "Content-Type: application/json" \
+ -d '{
+ "users": ["root"],
+ "start_time": {
+ "set": true,
+ "infinite": false,
+ "number": 1746057600
+ }
+ }'
+*/
+
+import (
+ "fmt"
+)
+
+type jobsResponse struct {
+ Jobs []JobInfo `json:"jobs"`
+}
+
+type JobFilter struct {
+ Users []string `json:"users,omitempty"`
+ StartTime int64 `json:"start_time,omitempty"`
+ EndTime int64 `json:"end_time,omitempty"`
+}
+
+type internalJobFilter struct {
+ Users []string `json:"users,omitempty"`
+ StartTime *SlurmNumber `json:"start_time,omitempty"`
+ EndTime *SlurmNumber `json:"end_time,omitempty"`
+}
+
+func (c *Client) ListJobs(filter JobFilter) ([]JobInfo, error) {
+ var out jobsResponse
+ internalFilter := internalJobFilter{
+ Users: filter.Users,
+ }
+ if filter.StartTime > 0 {
+ internalFilter.StartTime = &SlurmNumber{Set: true, Infinite:
false, Number: filter.StartTime}
+ }
+ if filter.EndTime > 0 {
+ internalFilter.EndTime = &SlurmNumber{Set: true, Infinite:
false, Number: filter.EndTime}
+ }
+
+ if _, err := c.do("GET", "/slurmdb/v0.0."+c.apiVersion+"/jobs",
internalFilter, &out); err != nil {
+ return nil, err
+ }
+ return out.Jobs, nil
+}
+
+func (c *Client) GetJob(id int64) (*JobInfo, error) {
+ var out jobsResponse
+ if _, err := c.do("GET", fmt.Sprintf("/slurmdb/v0.0.%s/job/%d",
c.apiVersion, id), nil, &out); err != nil {
+ return nil, err
+ }
+ if len(out.Jobs) == 0 {
+ return nil, fmt.Errorf("job %d not found", id)
+ }
+ return &out.Jobs[0], nil
+}
+
+func (c *Client) SubmitJob(request JobSubmitRequest) (*JobSubmitResponse,
error) {
+ var out JobSubmitResponse
+ if _, err := c.do("POST", fmt.Sprintf("/slurm/v0.0.%s/job/submit",
c.apiVersion), request, &out); err != nil {
+ return nil, err
+ }
+ return &out, nil
+}
diff --git a/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
b/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
new file mode 100644
index 000000000..0b5ebd3f2
--- /dev/null
+++ b/connectors/SLURM/Rest-Client/pkg/client/jobs_integration_test.go
@@ -0,0 +1,82 @@
+package client
+
+import (
+ "log"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestListJobs(t *testing.T) {
+ if !IsLocalSlurmConfigAvailable() {
+ t.Skip("Skipping integration test for listing jobs because
local SLURM config is not available")
+ }
+
+ apiUrl := "http://localhost:6820"
+ user := os.Getenv("TEST_SLURM_USER")
+ token := os.Getenv("TEST_SLURM_TOKEN")
+ apiVersion := os.Getenv("TEST_SLURM_API_VERSION")
+
+ client := New(apiUrl, user, token, apiVersion)
+
+ JobSubmitRequest := JobSubmitRequest{
+ JobSubmitParam: JobSubmitParam{
+ Name: "test-job",
+ Account: "root",
+ Partition: "compute",
+ Tasks: 1,
+ CpusPerTask: 1,
+ TimeLimit: SlurmNumber{
+ Set: true,
+ Infinite: false,
+ Number: 20, // 10 seconds
+ },
+ CurrentWorkingDir: "/home/testuser",
+ Environment: []string{
+ "TEST_ENV_VAR=test_value",
+ },
+ // You can set other job parameters here if needed
+ },
+ Script: "#!/bin/bash\nsleep 1", // Simple script that sleeps
for 1 second
+ }
+
+ currentTime := time.Now().Unix()
+ totalJobsToSubmit := 12
+ for i := 0; i < totalJobsToSubmit; i++ {
+ resp, err := client.SubmitJob(JobSubmitRequest)
+ if err != nil {
+ t.Fatalf("Failed to submit job: %v", err)
+ }
+
+ if resp.JobID == 0 {
+ t.Fatalf("Invalid job ID returned from job submission:
%d", resp.JobID)
+ }
+
+ log.Printf("Submitted job with ID: %d", resp.JobID)
+ }
+
+ sleepDuration := 20 // seconds
+ log.Printf("Sleeping for %d seconds to allow job to start...",
sleepDuration)
+ time.Sleep(time.Duration(sleepDuration) * time.Second)
+
+ filter := JobFilter{
+ // You can set filter parameters here if needed
+ Users: []string{"root"},
+ StartTime: currentTime,
+ }
+
+ jobs, err := client.ListJobs(filter)
+ if err != nil {
+ t.Fatalf("Failed to list jobs: %v", err)
+ }
+ if len(jobs) == 0 {
+ t.Log("No jobs found")
+ } else {
+ t.Logf("Found %d jobs", len(jobs))
+ if len(jobs) != totalJobsToSubmit {
+ t.Logf("Expected at %d jobs, but found %d",
totalJobsToSubmit, len(jobs))
+ } else {
+ t.Log("Successfully found all submitted jobs")
+ }
+ }
+}
diff --git a/connectors/SLURM/Association-Mapper/internal/operations/tres.go
b/connectors/SLURM/Rest-Client/pkg/client/tres.go
similarity index 97%
rename from connectors/SLURM/Association-Mapper/internal/operations/tres.go
rename to connectors/SLURM/Rest-Client/pkg/client/tres.go
index 313c58c59..aaad5fcf1 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/tres.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/tres.go
@@ -1,5 +1,5 @@
// cli/internal/client/tres.go
-package operations
+package client
import (
"fmt"
diff --git
a/connectors/SLURM/Association-Mapper/internal/operations/tres_test.go
b/connectors/SLURM/Rest-Client/pkg/client/tres_test.go
similarity index 97%
rename from connectors/SLURM/Association-Mapper/internal/operations/tres_test.go
rename to connectors/SLURM/Rest-Client/pkg/client/tres_test.go
index 79ba50faf..43b6c29f4 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/tres_test.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/tres_test.go
@@ -1,5 +1,5 @@
// cli/internal/client/tres_test.go
-package operations
+package client
import (
"reflect"
diff --git a/connectors/SLURM/Association-Mapper/internal/operations/types.go
b/connectors/SLURM/Rest-Client/pkg/client/types.go
similarity index 68%
rename from connectors/SLURM/Association-Mapper/internal/operations/types.go
rename to connectors/SLURM/Rest-Client/pkg/client/types.go
index e00e83d3f..0f6f208a3 100644
--- a/connectors/SLURM/Association-Mapper/internal/operations/types.go
+++ b/connectors/SLURM/Rest-Client/pkg/client/types.go
@@ -1,5 +1,5 @@
// cli/internal/client/types.go
-package operations
+package client
import "encoding/json"
@@ -52,22 +52,22 @@ type Association struct {
Limits AssocLimits `json:"-"`
}
-// slurmNumber matches slurmrestd's {set, infinite, number} triple used for
+// SlurmNumber matches slurmrestd's {set, infinite, number} triple used for
// all scalar limit values in the v0.0.41 accounting schema.
-type slurmNumber struct {
+type SlurmNumber struct {
Set bool `json:"set"`
Infinite bool `json:"infinite"`
Number int64 `json:"number"`
}
-func numPtr(n *int64) *slurmNumber {
+func numPtr(n *int64) *SlurmNumber {
if n == nil {
return nil
}
- return &slurmNumber{Set: true, Number: *n}
+ return &SlurmNumber{Set: true, Number: *n}
}
-func ptrNum(n *slurmNumber) *int64 {
+func ptrNum(n *SlurmNumber) *int64 {
if n == nil || !n.Set || n.Infinite {
return nil
}
@@ -87,8 +87,8 @@ type assocMaxJobs struct {
}
type assocMaxJobsPer struct {
- Count *slurmNumber `json:"count,omitempty"` // GrpJobs
- WallClock *slurmNumber `json:"wall_clock,omitempty"` //
MaxWallDurationPerJob (seconds)
+ Count *SlurmNumber `json:"count,omitempty"` // GrpJobs
+ WallClock *SlurmNumber `json:"wall_clock,omitempty"` //
MaxWallDurationPerJob (seconds)
}
type assocMaxTRES struct {
@@ -184,3 +184,61 @@ func (a *Association) UnmarshalJSON(data []byte) error {
}
return nil
}
+
+type JobTime struct {
+ Elapsed int64 `json:"elapsed"`
+ Eligible int64 `json:"eligible"`
+ End int64 `json:"end"`
+ Start int64 `json:"start"`
+ Submission int64 `json:"submission"`
+ Suspended int64 `json:"suspended"`
+}
+
+type JobTresInfo struct {
+ Allocated []TRES `json:"allocated,omitempty"`
+ Requested []TRES `json:"requested,omitempty"`
+}
+
+type JobExitInfo struct {
+ Status []string `json:"status"`
+ ReturnCode SlurmNumber `json:"return_code"`
+}
+
+type JobInfo struct {
+ Account string `json:"account"`
+ Cluster string `json:"cluster"`
+ Time JobTime `json:"time"`
+ JobID int64 `json:"job_id"`
+ Name string `json:"name"`
+ Partition string `json:"partition"`
+ QoS string `json:"qos"`
+ User string `json:"user"`
+ Nodes string `json:"nodes"`
+ Tres JobTresInfo `json:"tres"`
+ ExitCode JobExitInfo `json:"exit_code"`
+ DerivedExitCode JobExitInfo `json:"derived_exit_code"`
+}
+
+type JobSubmitParam struct {
+ Account string `json:"account"`
+ Partition string `json:"partition,omitempty"`
+ QoS string `json:"qos,omitempty"`
+ Name string `json:"name,omitempty"`
+ Tasks int64 `json:"tasks,omitempty"`
+ CurrentWorkingDir string
`json:"current_working_directory,omitempty"`
+ Environment []string `json:"environment,omitempty"`
+ CpusPerTask int64 `json:"cpus_per_task,omitempty"`
+ Memory int64 `json:"memory,omitempty"`
+ TimeLimit SlurmNumber `json:"time_limit,omitempty"` // seconds
+}
+
+type JobSubmitRequest struct {
+ JobSubmitParam JobSubmitParam `json:"job"`
+ Script string `json:"script"`
+}
+
+type JobSubmitResponse struct {
+ JobID int64 `json:"job_id"`
+ StepID string `json:"step_id"`
+ JobSubmitUserMsg string `json:"job_submit_user_msg"`
+}
diff --git a/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
new file mode 100644
index 000000000..940670d52
--- /dev/null
+++ b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go
@@ -0,0 +1,165 @@
+package smonitor
+
+import (
+ "context"
+ "log/slog"
+ "strconv"
+ "time"
+
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
+ "github.com/apache/airavata-custos/pkg/events"
+ "github.com/apache/airavata-custos/pkg/models"
+ "github.com/apache/airavata-custos/pkg/service"
+)
+
+const monitorInterval = 30 * time.Second
+
+type SlurmMonitor struct {
+ slurmClient *client.Client
+ eventBus *events.Bus
+ coreService service.CoreService
+ clusterId string
+ lastMonitorTime int64
+}
+
+func NewSlurmMonitor(slurmClient *client.Client, eventBus *events.Bus,
coreService service.CoreService, clusterId string) *SlurmMonitor {
+ return &SlurmMonitor{
+ slurmClient: slurmClient,
+ eventBus: eventBus,
+ coreService: coreService,
+ clusterId: clusterId,
+ lastMonitorTime: 1, // initialize to 1 to avoid issues with
zero value
+ }
+}
+
+func (m *SlurmMonitor) StartMonitor(ctx context.Context) {
+ ticker := time.NewTicker(monitorInterval)
+ defer ticker.Stop()
+
+ slog.Info("Starting SLURM usage monitor", "interval", monitorInterval)
+ for {
+ select {
+ case <-ctx.Done():
+ slog.Info("Stopping SLURM usage monitor", "reason",
ctx.Err())
+ return
+ case <-ticker.C:
+ m.poll()
+ }
+ }
+}
+
+func (m *SlurmMonitor) poll() {
+ slog.Debug("polling SLURM usage")
+ context, cancel := context.WithTimeout(context.Background(),
10*time.Second)
+ defer cancel()
+ cluster, err := m.coreService.GetComputeCluster(context, m.clusterId)
+ if err != nil {
+ slog.Error("failed to get compute cluster", "error", err)
+ return
+ }
+
+ allocations, err :=
m.coreService.ListComputeAllocationsByCluster(context, cluster.ID)
+ if err != nil {
+ slog.Error("failed to list compute allocations", "error", err)
+ return
+ }
+
+ jobFilter := client.JobFilter{
+ StartTime: m.lastMonitorTime,
+ EndTime: time.Now().Unix(),
+ }
+
+ jobs, err := m.slurmClient.ListJobs(jobFilter)
+ if err != nil {
+ slog.Error("failed to list SLURM jobs", "error", err)
+ return
+ }
+ m.lastMonitorTime = jobFilter.EndTime
+
+ for _, job := range jobs {
+ //slog.Debug("processing SLURM job", "job_id", job.JobID,
"job_name", job.Name)
+ //m.coreService.GetComputeAllocationResource()
+ slog.Info("Job object", "job", job)
+ targetAccount := job.Account
+ for _, alloc := range allocations {
+ if alloc.Name == targetAccount {
+ slog.Info("found matching compute allocation
for SLURM job", "job_id", job.JobID, "allocation_id", alloc.ID)
+
+ user, err :=
m.coreService.GetComputeClusterUserByLocalUsernameAndCluster(context, job.User,
cluster.ID)
+ if err != nil {
+ if err == service.ErrNotFound {
+ slog.Warn("compute cluster user
not found for SLURM job, skipping usage recording", "local_username", job.User,
"cluster_id", cluster.ID)
+ return
+ } else {
+ slog.Error("failed to get
compute cluster user", "error", err)
+ return
+ }
+ }
+
+ resource, err :=
m.coreService.GetComputeAllocationResourceByNameAndCluster(context,
job.Partition, cluster.ID)
+
+ if err != nil {
+ if err == service.ErrNotFound {
+ slog.Warn("compute allocation
resource not found for SLURM job, skipping usage recording", "resource_name",
job.Partition, "cluster_id", cluster.ID)
+ return
+ } else {
+ slog.Error("failed to get
compute allocation resource", "error", err)
+ return
+ }
+ }
+
+ jobId := strconv.FormatInt(job.JobID, 10)
+ existing, err :=
m.coreService.GetComputeAllocationUsageByComputeAllocationIDAndJobID(context,
alloc.ID, jobId)
+
+ if err != nil && err != service.ErrNotFound {
+ slog.Error("failed to check for
existing compute allocation usage", "error", err)
+ return
+ }
+
+ jobDurationMs := job.Time.End - job.Time.Start
+
+ if jobDurationMs <= 0 {
+ slog.Warn("SLURM job has non-positive
duration, skipping usage recording", "job_id", job.JobID, "duration",
jobDurationMs)
+ return
+ }
+
+ tresType := resource.ResourceType
+
+ resourceAmount := int64(0)
+ nodeCount := int64(0)
+ for _, tres := range job.Tres.Allocated {
+ // Process each TRES type and its
allocated amount as needed
+ // Example tres entry
Allocated:[{Type:cpu Name: Count:1} {Type:mem Name: Count:8000} {Type:energy
Name: Count:-2} {Type:node Name: Count:1} {Type:billing Name: Count:1}]
+ if tres.Type == tresType {
+ resourceAmount = tres.Count
+ }
+ if tres.Type == "node" {
+ nodeCount = tres.Count
+ }
+ }
+
+ calulatedRawAmount := float64(resourceAmount) *
float64(nodeCount) * float64(jobDurationMs) / (1000 * 3600) // Convert to
minutes, adjust as needed based on how you want to calculate usage
+
+ usageModel := &models.ComputeAllocationUsage{
+ ComputeAllocationID: alloc.ID,
+ UsedRawAmount:
calulatedRawAmount, // This is a simplification, adjust as needed based on
how you want to calculate usage
+ UsedSUAmount:
calulatedRawAmount * 1, // Assuming 1 SU per second for simplicity, adjust as
needed based on your SU calculation logic
+ CalculatedTime: time.Now(),
+ UserID: user.ID,
+ JobID: jobId,
+ ComputeAllocationResourceID:
resource.ID,
+ }
+
+ if existing != nil {
+
m.coreService.DeleteComputeAllocationUsage(context, existing.ID)
+ slog.Info("deleted existing compute
allocation usage for SLURM job", "job_id", job.JobID, "existing_usage_id",
existing.ID)
+ }
+
m.coreService.CreateComputeAllocationUsage(context, usageModel)
+ break
+ }
+ }
+ }
+
+ slog.Info("successfully polled SLURM usage", "num_allocations",
len(allocations), "num_jobs", len(jobs))
+
+}
diff --git
a/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor_integration_test.go
b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor_integration_test.go
new file mode 100644
index 000000000..372b2b651
--- /dev/null
+++
b/connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor_integration_test.go
@@ -0,0 +1,265 @@
+package smonitor
+
+import (
+ "context"
+ operations
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
+ "github.com/apache/airavata-custos/pkg/models"
+ "github.com/apache/airavata-custos/pkg/service"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestSlurmMonitorIntegration(t *testing.T) {
+ if !operations.IsLocalSlurmConfigAvailable() {
+ t.Skip("Skipping integration test for listing jobs because
local SLURM config is not available")
+ }
+
+ apiUrl := "http://localhost:6820"
+ user := os.Getenv("TEST_SLURM_USER")
+ token := os.Getenv("TEST_SLURM_TOKEN")
+ user1Token := os.Getenv("TEST_SLURM_TOKEN2")
+ user2Token := os.Getenv("TEST_SLURM_TOKEN3")
+ user3Token := os.Getenv("TEST_SLURM_TOKEN4")
+ apiVersion := os.Getenv("TEST_SLURM_API_VERSION")
+
+ clusterName := "artisan"
+ user1 := "testuser"
+ user2 := "testuser2"
+ user3 := "testuser3"
+ rootClient := operations.New(apiUrl, user, token, apiVersion)
+ user1Client := operations.New(apiUrl, user1, user1Token, apiVersion)
+ user2Client := operations.New(apiUrl, user2, user2Token, apiVersion)
+ user3Client := operations.New(apiUrl, user3, user3Token, apiVersion)
+
+ currentTime := time.Now().Unix()
+
+ accountName := "researchers"
+ slurmAccount := operations.Account{
+ Name: accountName,
+ Description: "Researchers account",
+ Organization: "researchers-org",
+ }
+
+ rootClient.CreateAccount(slurmAccount, clusterName)
+
+ rootClient.UpsertAssociation(operations.Association{
+ Account: accountName,
+ Cluster: clusterName,
+ User: user1,
+ })
+
+ defer rootClient.DeleteAssociation(operations.AssocFilter{
+ Account: accountName,
+ Cluster: clusterName,
+ User: user1,
+ })
+
+ rootClient.UpsertAssociation(operations.Association{
+ Account: accountName,
+ Cluster: clusterName,
+ User: user2,
+ })
+
+ defer rootClient.DeleteAssociation(operations.AssocFilter{
+ Account: accountName,
+ Cluster: clusterName,
+ User: user2,
+ })
+
+ rootClient.UpsertAssociation(operations.Association{
+ Account: accountName,
+ Cluster: clusterName,
+ User: user3,
+ })
+
+ defer rootClient.DeleteAssociation(operations.AssocFilter{
+ Account: accountName,
+ Cluster: clusterName,
+ User: user3,
+ })
+
+ defer rootClient.DeleteAccount(accountName)
+
+ user1Client.SubmitJob(operations.JobSubmitRequest{
+ JobSubmitParam: operations.JobSubmitParam{
+ Name: "test-job",
+ Account: accountName,
+ Partition: "compute",
+ Tasks: 1,
+ CpusPerTask: 1,
+ TimeLimit: operations.SlurmNumber{
+ Set: true,
+ Infinite: false,
+ Number: 20, // 10 seconds
+ },
+ CurrentWorkingDir: "/home/" + user1,
+ Environment: []string{
+ "TEST_ENV_VAR=test_value",
+ },
+ // You can set other job parameters here if needed
+ },
+ Script: "#!/bin/bash\nsleep 2", // Simple script that sleeps
for 1 second
+ })
+
+ user2Client.SubmitJob(operations.JobSubmitRequest{
+ JobSubmitParam: operations.JobSubmitParam{
+ Name: "test-job2",
+ Account: accountName,
+ Partition: "compute",
+ Tasks: 1,
+ CpusPerTask: 1,
+ TimeLimit: operations.SlurmNumber{
+ Set: true,
+ Infinite: false,
+ Number: 20, // 10 seconds
+ },
+ CurrentWorkingDir: "/home/" + user2,
+ Environment: []string{
+ "TEST_ENV_VAR=test_value",
+ },
+ // You can set other job parameters here if needed
+ },
+ Script: "#!/bin/bash\nsleep 2", // Simple script that sleeps
for 1 second
+ })
+
+ user3Client.SubmitJob(operations.JobSubmitRequest{
+ JobSubmitParam: operations.JobSubmitParam{
+ Name: "test-job3",
+ Account: accountName,
+ Partition: "compute",
+ Tasks: 1,
+ CpusPerTask: 1,
+ TimeLimit: operations.SlurmNumber{
+ Set: true,
+ Infinite: false,
+ Number: 20, // 10 seconds
+ },
+ CurrentWorkingDir: "/home/" + user3,
+ Environment: []string{
+ "TEST_ENV_VAR=test_value",
+ },
+ // You can set other job parameters here if needed
+ },
+ Script: "#!/bin/bash\nsleep 2", // Simple script that sleeps
for 1 second
+ })
+
+ // Sleep for a while to allow the monitor to pick up the job
+ sleepDuration := 8 // seconds
+ t.Logf("Sleeping for %d seconds to allow monitor to pick up the
job...", sleepDuration)
+ time.Sleep(time.Duration(sleepDuration) * time.Second)
+
+ filter := operations.JobFilter{
+ // You can set filter parameters here if needed
+ Users: []string{"root", user1, user2, user3},
+ StartTime: currentTime,
+ }
+
+ jobs, err := rootClient.ListJobs(filter)
+ if err != nil {
+ t.Fatalf("Failed to list jobs: %v", err)
+ }
+ if len(jobs) == 0 {
+ t.Log("No jobs found")
+ }
+ t.Logf("Found %d jobs", len(jobs))
+
+ comAllcUsages := make([]*models.ComputeAllocationUsage, 0)
+
+ mockCoreService := &service.CoreServiceMock{
+
+ GetComputeClusterFunc: func(ctx context.Context, clusterId
string) (*models.ComputeCluster, error) {
+ return &models.ComputeCluster{
+ ID: clusterId,
+ Name: "artisan",
+ }, nil
+ },
+
+ ListComputeAllocationsByClusterFunc: func(ctx context.Context,
clusterId string) ([]models.ComputeAllocation, error) {
+ return []models.ComputeAllocation{
+ {
+ ID: "allocation-1",
+ Name: accountName, // Match
the SLURM account name to link jobs to this allocation
+ ComputeClusterID: clusterId,
+ },
+ {
+ ID: "allocation-2",
+ Name: "Test Allocation 2",
+ ComputeClusterID: clusterId,
+ },
+ }, nil
+ },
+
+ GetComputeClusterUserByLocalUsernameAndClusterFunc: func(ctx
context.Context, localUsername string, clusterId string)
(*models.ComputeClusterUser, error) {
+ return &models.ComputeClusterUser{
+ ID: "user-" + localUsername,
+ ComputeClusterID: clusterId,
+ UserID: "user-id-" + localUsername,
+ LocalUsername: localUsername,
+ }, nil
+ },
+
+ CreateComputeAllocationUsageFunc: func(ctx context.Context, u
*models.ComputeAllocationUsage) (*models.ComputeAllocationUsage, error) {
+ t.Logf("CreateComputeAllocationUsage called with: %+v",
u)
+ comAllcUsages = append(comAllcUsages, u)
+ return u, nil
+ },
+
+ GetComputeAllocationResourceByNameAndClusterFunc: func(ctx
context.Context, name string, clusterId string)
(*models.ComputeAllocationResource, error) {
+ return &models.ComputeAllocationResource{
+ ID: "resource-" + name,
+ Name: name,
+ ResourceType: "cpu",
+ ResourceAmount: 4,
+ ComputeClusterID: clusterId,
+ }, nil
+ },
+
+ GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc:
func(ctx context.Context, allocationID string, jobID string)
(*models.ComputeAllocationUsage, error) {
+ return nil, service.ErrNotFound
+ },
+
+ DeleteComputeAllocationUsageFunc: func(ctx context.Context,
usageID string) error {
+ t.Logf("DeleteComputeAllocationUsage called with ID:
%s", usageID)
+ return nil
+ },
+ }
+
+ monitor := &SlurmMonitor{
+ slurmClient: rootClient, // You can use a mock or real
client here depending on your testing strategy
+ eventBus: nil, // You can use a mock or real
event bus here depending on your testing strategy
+ coreService: mockCoreService,
+ clusterId: "test-cluster",
+ lastMonitorTime: currentTime, // initialize to 1 to avoid
issues with zero value
+ }
+
+ // Call the poll method directly for testing
+ monitor.poll()
+
+ t.Logf("Total CreateComputeAllocationUsage calls: %d",
len(comAllcUsages))
+
+ // Validate that CreateComputeAllocationUsage was called with expected
values
+ if len(comAllcUsages) == 0 {
+ t.Fatal("Expected CreateComputeAllocationUsage to be called at
least once, but it was not called")
+ }
+
+ for _, usage := range comAllcUsages {
+ t.Logf("Validating ComputeAllocationUsage: %+v", usage)
+ if usage.ComputeAllocationID != "allocation-1" {
+ t.Errorf("Expected ComputeAllocationID to be
'allocation-1', got '%s'", usage.ComputeAllocationID)
+ }
+ if usage.UsedRawAmount <= 0 {
+ t.Errorf("Expected UsedRawAmount to be greater than 0,
got %f", usage.UsedRawAmount)
+ }
+ if usage.UsedSUAmount <= 0 {
+ t.Errorf("Expected UsedSUAmount to be greater than 0,
got %f", usage.UsedSUAmount)
+ }
+ if usage.UserID != "user-"+usage.UserID[5:] { // Extract local
username from UserID
+ t.Errorf("Expected UserID to be 'user-%s', got '%s'",
usage.UserID[5:], usage.UserID)
+ }
+ if usage.JobID == "" {
+ t.Error("Expected JobID to be set, but it was empty")
+ }
+ }
+
+}
diff --git a/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
new file mode 100644
index 000000000..e8c314f60
--- /dev/null
+++ b/connectors/SLURM/Usage-Monitor/pkg/monitor/loader.go
@@ -0,0 +1,42 @@
+package monitor
+
+import (
+ "context"
+ "log/slog"
+ "os"
+ "sync"
+
+
"github.com/apache/airavata-custos/connectors/SLURM/Rest-Client/pkg/client"
+
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/internal/smonitor"
+ "github.com/apache/airavata-custos/pkg/events"
+ "github.com/apache/airavata-custos/pkg/service"
+ "github.com/jmoiron/sqlx"
+)
+
+func LoadConnector(ctx context.Context, _ *sqlx.DB, eventBus *events.Bus,
coreService *service.Service, wg *sync.WaitGroup) error {
+
+ // Read url, username, and password from environment variables
+ apiUrl := os.Getenv("SLURM_API")
+ user := os.Getenv("SLURM_USER")
+ token := os.Getenv("SLURM_TOKEN")
+ apiVersion := os.Getenv("SLURM_API_VERSION")
+ monitorClusterID := os.Getenv("SLURM_MONITOR_CLUSTER_ID")
+ if monitorClusterID == "" {
+ slog.Warn("SLURM_MONITOR_CLUSTER_ID not set, defaulting to
'slurm-cluster'")
+ monitorClusterID = "slurm-cluster"
+ }
+ if apiUrl == "" || user == "" || token == "" || apiVersion == "" {
+ slog.Warn("SLURM API credentials not fully provided, skipping
SLURM Usage Monitor connector")
+ slog.Warn("SLURM API credentials", "apiUrl", apiUrl, "user",
user, "token", token, "apiVersion", apiVersion)
+ return nil
+ }
+
+ slurmClient := client.New(apiUrl, user, token, apiVersion)
+ monitor := smonitor.NewSlurmMonitor(slurmClient, eventBus, coreService,
monitorClusterID)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ monitor.StartMonitor(ctx)
+ }()
+ return nil
+}
diff --git a/dev-ops/local-slurm/Makefile b/dev-ops/local-slurm/Makefile
index 4a97eb535..662ce55d9 100644
--- a/dev-ops/local-slurm/Makefile
+++ b/dev-ops/local-slurm/Makefile
@@ -23,6 +23,9 @@ smoke:
lint:
cd cli && go vet ./...
+login:
+ docker compose exec login bash
+
keys:
docker compose up init-keys
@@ -32,6 +35,15 @@ logs:
token:
docker compose exec login scontrol token
+token2:
+ docker compose exec login su - testuser -c "scontrol token"
+
+token3:
+ docker compose exec login su - testuser2 -c "scontrol token"
+
+token4:
+ docker compose exec login su - testuser3 -c "scontrol token"
+
allocations:
docker compose exec login sacct -n -o JobID,State --starttime
now-5minutes
diff --git a/dev-ops/local-slurm/README.md b/dev-ops/local-slurm/README.md
new file mode 100644
index 000000000..b6290e019
--- /dev/null
+++ b/dev-ops/local-slurm/README.md
@@ -0,0 +1,3 @@
+make up for starting SLURM Cluster
+make down for stopping the SLURM cluster
+make token to print the REST api token
\ No newline at end of file
diff --git a/dev-ops/local-slurm/scripts/bootstrap-accounting.sh
b/dev-ops/local-slurm/scripts/bootstrap-accounting.sh
index 2c6427294..467e8730d 100755
--- a/dev-ops/local-slurm/scripts/bootstrap-accounting.sh
+++ b/dev-ops/local-slurm/scripts/bootstrap-accounting.sh
@@ -37,5 +37,11 @@ if ! sacctmgr -in show user format=user | grep -qw "root";
then
sacctmgr -i add user root Account=root AdminLevel=Administrator
fi
+#for u in testuser testuser2 testuser3; do
+# if ! sacctmgr -in show user format=user | grep -qw "$u"; then
+# sacctmgr -i add user "$u" Account=root
+# fi
+#done
+
touch "$SENTINEL"
echo "[bootstrap] done"
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index ea070208c..258a6d924 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/pkg/amie"
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/pkg/comanage"
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper"
+
"github.com/apache/airavata-custos/connectors/SLURM/Usage-Monitor/pkg/monitor"
"github.com/apache/airavata-custos/pkg/events"
"github.com/apache/airavata-custos/pkg/service"
)
@@ -45,13 +46,18 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB,
eventBus *events.Bus
slog.Error("failed to load AMIE connector", "error", err)
return err
}
-
slog.Info("loading COmanage Identity-Provisioner connector")
if err := comanage.LoadConnector(ctx, database, eventBus, coreService,
wg); err != nil {
slog.Error("failed to load COmanage Identity-Provisioner
connector", "error", err)
return err
}
+ slog.Info("loading SLURM Usage Monitor connector")
+ if err := monitor.LoadConnector(ctx, database, eventBus, coreService,
wg); err != nil {
+ slog.Error("failed to load SLURM Usage Monitor connector",
"error", err)
+ return err
+ }
+
slog.Info("finished loading connectors")
return nil
}
diff --git a/internal/db/migrations/000004_allocations.up.sql
b/internal/db/migrations/000004_allocations.up.sql
index 8624ed724..6144394e2 100644
--- a/internal/db/migrations/000004_allocations.up.sql
+++ b/internal/db/migrations/000004_allocations.up.sql
@@ -165,9 +165,7 @@ CREATE TABLE IF NOT EXISTS compute_allocation_usages
KEY idx_compute_allocation_usages_allocation (compute_allocation_id,
calculated_time),
KEY idx_compute_allocation_usages_user (user_id),
KEY idx_compute_allocation_usages_job (job_id),
- KEY idx_compute_allocation_usages_resource
(compute_allocation_resource_id),
- CONSTRAINT fk_compute_allocation_usages_allocation FOREIGN KEY
(compute_allocation_id)
- REFERENCES compute_allocations (id) ON DELETE CASCADE
+ KEY idx_compute_allocation_usages_resource (compute_allocation_resource_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci;
CREATE TABLE IF NOT EXISTS compute_allocation_membership_resource_overrides
diff --git a/internal/db/migrations/000006_user_privileges.down.sql
b/internal/db/migrations/000006_compute_allocation_resource_cluster.down.sql
similarity index 76%
copy from internal/db/migrations/000006_user_privileges.down.sql
copy to
internal/db/migrations/000006_compute_allocation_resource_cluster.down.sql
index 162b3d7bc..e2dfaea9d 100644
--- a/internal/db/migrations/000006_user_privileges.down.sql
+++ b/internal/db/migrations/000006_compute_allocation_resource_cluster.down.sql
@@ -15,4 +15,8 @@
-- specific language governing permissions and limitations
-- under the License.
-DROP TABLE IF EXISTS user_privileges;
+ALTER TABLE compute_allocation_resources
+ DROP FOREIGN KEY fk_compute_allocation_resources_cluster,
+ DROP KEY uq_compute_allocation_resources_cluster_name,
+ DROP KEY idx_compute_allocation_resources_cluster,
+ DROP COLUMN compute_cluster_id;
diff --git a/internal/db/migrations/000006_user_privileges.down.sql
b/internal/db/migrations/000006_compute_allocation_resource_cluster.up.sql
similarity index 65%
copy from internal/db/migrations/000006_user_privileges.down.sql
copy to internal/db/migrations/000006_compute_allocation_resource_cluster.up.sql
index 162b3d7bc..f826a45ac 100644
--- a/internal/db/migrations/000006_user_privileges.down.sql
+++ b/internal/db/migrations/000006_compute_allocation_resource_cluster.up.sql
@@ -15,4 +15,9 @@
-- specific language governing permissions and limitations
-- under the License.
-DROP TABLE IF EXISTS user_privileges;
+ALTER TABLE compute_allocation_resources
+ ADD COLUMN compute_cluster_id VARCHAR(255) NOT NULL,
+ ADD KEY idx_compute_allocation_resources_cluster (compute_cluster_id),
+ ADD UNIQUE KEY uq_compute_allocation_resources_cluster_name
(compute_cluster_id, name),
+ ADD CONSTRAINT fk_compute_allocation_resources_cluster FOREIGN KEY
(compute_cluster_id)
+ REFERENCES compute_clusters (id) ON DELETE RESTRICT;
diff --git a/internal/db/migrations/000006_user_privileges.down.sql
b/internal/db/migrations/000007_user_privileges.down.sql
similarity index 100%
copy from internal/db/migrations/000006_user_privileges.down.sql
copy to internal/db/migrations/000007_user_privileges.down.sql
diff --git a/internal/db/migrations/000006_user_privileges.up.sql
b/internal/db/migrations/000007_user_privileges.up.sql
similarity index 100%
rename from internal/db/migrations/000006_user_privileges.up.sql
rename to internal/db/migrations/000007_user_privileges.up.sql
diff --git a/internal/db/migrations/000007_roles.down.sql
b/internal/db/migrations/000008_roles.down.sql
similarity index 100%
rename from internal/db/migrations/000007_roles.down.sql
rename to internal/db/migrations/000008_roles.down.sql
diff --git a/internal/db/migrations/000007_roles.up.sql
b/internal/db/migrations/000008_roles.up.sql
similarity index 100%
rename from internal/db/migrations/000007_roles.up.sql
rename to internal/db/migrations/000008_roles.up.sql
diff --git a/internal/db/migrations/000006_user_privileges.down.sql
b/internal/db/migrations/000009_update_allocation_usage_columns.down.sql
similarity index 78%
copy from internal/db/migrations/000006_user_privileges.down.sql
copy to internal/db/migrations/000009_update_allocation_usage_columns.down.sql
index 162b3d7bc..d092d87a5 100644
--- a/internal/db/migrations/000006_user_privileges.down.sql
+++ b/internal/db/migrations/000009_update_allocation_usage_columns.down.sql
@@ -15,4 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-DROP TABLE IF EXISTS user_privileges;
+-- Revert ComputeAllocationUsage table columns to BIGINT
+ALTER TABLE compute_allocation_usages
+ MODIFY COLUMN used_raw_amount BIGINT NOT NULL DEFAULT 0,
+ MODIFY COLUMN used_su_amount BIGINT NOT NULL DEFAULT 0;
diff --git a/internal/db/migrations/000006_user_privileges.down.sql
b/internal/db/migrations/000009_update_allocation_usage_columns.up.sql
similarity index 77%
rename from internal/db/migrations/000006_user_privileges.down.sql
rename to internal/db/migrations/000009_update_allocation_usage_columns.up.sql
index 162b3d7bc..a820fb318 100644
--- a/internal/db/migrations/000006_user_privileges.down.sql
+++ b/internal/db/migrations/000009_update_allocation_usage_columns.up.sql
@@ -15,4 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-DROP TABLE IF EXISTS user_privileges;
+-- Update ComputeAllocationUsage table columns to match model data types
+ALTER TABLE compute_allocation_usages
+ MODIFY COLUMN used_raw_amount DOUBLE NOT NULL DEFAULT 0,
+ MODIFY COLUMN used_su_amount DOUBLE NOT NULL DEFAULT 0;
diff --git a/internal/store/compute_allocation_resource_mapping_store.go
b/internal/store/compute_allocation_resource_mapping_store.go
index 9f05bf70b..7d96e41ca 100644
--- a/internal/store/compute_allocation_resource_mapping_store.go
+++ b/internal/store/compute_allocation_resource_mapping_store.go
@@ -71,7 +71,7 @@ func (s *mysqlComputeAllocationResourceMappingStore)
FindByPair(ctx context.Cont
func (s *mysqlComputeAllocationResourceMappingStore)
FindResourcesByAllocation(ctx context.Context, allocationID string)
([]models.ComputeAllocationResource, error) {
var resources []models.ComputeAllocationResource
err := s.db.SelectContext(ctx, &resources,
- `SELECT r.id, r.name, r.resource_type, r.resource_amount
+ `SELECT r.id, r.name, r.resource_type, r.resource_amount,
r.compute_cluster_id
FROM compute_allocation_resources r
JOIN compute_allocation_resource_mappings m
ON m.compute_allocation_resource_id = r.id
diff --git a/internal/store/compute_allocation_resource_store.go
b/internal/store/compute_allocation_resource_store.go
index efbd507a2..449eb3e38 100644
--- a/internal/store/compute_allocation_resource_store.go
+++ b/internal/store/compute_allocation_resource_store.go
@@ -40,7 +40,7 @@ func NewComputeAllocationResourceStore(db *sqlx.DB)
ComputeAllocationResourceSto
func (s *mysqlComputeAllocationResourceStore) FindByID(ctx context.Context, id
string) (*models.ComputeAllocationResource, error) {
var r models.ComputeAllocationResource
err := s.db.GetContext(ctx, &r,
- `SELECT id, name, resource_type, resource_amount FROM
compute_allocation_resources WHERE id = ?`, id)
+ `SELECT id, name, resource_type, resource_amount,
compute_cluster_id FROM compute_allocation_resources WHERE id = ?`, id)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
@@ -50,10 +50,38 @@ func (s *mysqlComputeAllocationResourceStore) FindByID(ctx
context.Context, id s
return &r, nil
}
+func (s *mysqlComputeAllocationResourceStore) FindByNameAndCluster(ctx
context.Context, name, clusterID string) (*models.ComputeAllocationResource,
error) {
+ var r models.ComputeAllocationResource
+ err := s.db.GetContext(ctx, &r,
+ `SELECT id, name, resource_type, resource_amount,
compute_cluster_id
+ FROM compute_allocation_resources
+ WHERE name = ? AND compute_cluster_id = ?`, name, clusterID)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &r, nil
+}
+
+func (s *mysqlComputeAllocationResourceStore) FindByTypeAndCluster(ctx
context.Context, resourceType, clusterID string)
([]models.ComputeAllocationResource, error) {
+ var resources []models.ComputeAllocationResource
+ err := s.db.SelectContext(ctx, &resources,
+ `SELECT id, name, resource_type, resource_amount,
compute_cluster_id
+ FROM compute_allocation_resources
+ WHERE resource_type = ? AND compute_cluster_id = ?
+ ORDER BY name`, resourceType, clusterID)
+ if err != nil {
+ return nil, err
+ }
+ return resources, nil
+}
+
func (s *mysqlComputeAllocationResourceStore) List(ctx context.Context)
([]models.ComputeAllocationResource, error) {
var resources []models.ComputeAllocationResource
err := s.db.SelectContext(ctx, &resources,
- `SELECT id, name, resource_type, resource_amount FROM
compute_allocation_resources ORDER BY name`)
+ `SELECT id, name, resource_type, resource_amount,
compute_cluster_id FROM compute_allocation_resources ORDER BY name`)
if err != nil {
return nil, err
}
@@ -62,18 +90,18 @@ func (s *mysqlComputeAllocationResourceStore) List(ctx
context.Context) ([]model
func (s *mysqlComputeAllocationResourceStore) Create(ctx context.Context, tx
*sql.Tx, r *models.ComputeAllocationResource) error {
_, err := tx.ExecContext(ctx,
- `INSERT INTO compute_allocation_resources (id, name,
resource_type, resource_amount)
- VALUES (?, ?, ?, ?)`,
- r.ID, r.Name, r.ResourceType, r.ResourceAmount)
+ `INSERT INTO compute_allocation_resources (id, name,
resource_type, resource_amount, compute_cluster_id)
+ VALUES (?, ?, ?, ?, ?)`,
+ r.ID, r.Name, r.ResourceType, r.ResourceAmount,
r.ComputeClusterID)
return err
}
func (s *mysqlComputeAllocationResourceStore) Update(ctx context.Context, tx
*sql.Tx, r *models.ComputeAllocationResource) error {
_, err := tx.ExecContext(ctx,
`UPDATE compute_allocation_resources
- SET name = ?, resource_type = ?, resource_amount = ?
+ SET name = ?, resource_type = ?, resource_amount = ?,
compute_cluster_id = ?
WHERE id = ?`,
- r.Name, r.ResourceType, r.ResourceAmount, r.ID)
+ r.Name, r.ResourceType, r.ResourceAmount, r.ComputeClusterID,
r.ID)
return err
}
diff --git a/internal/store/compute_allocation_usage_store.go
b/internal/store/compute_allocation_usage_store.go
index a039b64cd..9d1dbceba 100644
--- a/internal/store/compute_allocation_usage_store.go
+++ b/internal/store/compute_allocation_usage_store.go
@@ -78,6 +78,19 @@ func (s *mysqlComputeAllocationUsageStore) FindByUser(ctx
context.Context, userI
return rows, nil
}
+func (s *mysqlComputeAllocationUsageStore)
FindByComputeAllocationIDAndJobID(ctx context.Context, allocationID, jobID
string) (*models.ComputeAllocationUsage, error) {
+ var u models.ComputeAllocationUsage
+ err := s.db.GetContext(ctx, &u,
+ `SELECT `+computeAllocationUsageColumns+` FROM
compute_allocation_usages WHERE compute_allocation_id = ? AND job_id = ?`,
allocationID, jobID)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &u, nil
+}
+
func (s *mysqlComputeAllocationUsageStore) SumSUForAllocation(ctx
context.Context, allocationID string) (int64, error) {
var total sql.NullInt64
err := s.db.GetContext(ctx, &total,
diff --git a/internal/store/compute_cluster_user_store.go
b/internal/store/compute_cluster_user_store.go
index 8e56b0c60..1888aa4a0 100644
--- a/internal/store/compute_cluster_user_store.go
+++ b/internal/store/compute_cluster_user_store.go
@@ -67,6 +67,21 @@ func (s *mysqlComputeClusterUserStore) FindByPair(ctx
context.Context, clusterID
return &c, nil
}
+func (s *mysqlComputeClusterUserStore) FindByLocalUsernameAndCluster(ctx
context.Context, clusterID, localUsername string) (*models.ComputeClusterUser,
error) {
+ var c models.ComputeClusterUser
+ err := s.db.GetContext(ctx, &c,
+ `SELECT `+computeClusterUserColumns+`
+ FROM compute_cluster_users
+ WHERE compute_cluster_id = ? AND local_username = ?`, clusterID,
localUsername)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return nil, nil
+ }
+ return nil, err
+ }
+ return &c, nil
+}
+
func (s *mysqlComputeClusterUserStore) FindByCluster(ctx context.Context,
clusterID string) ([]models.ComputeClusterUser, error) {
var users []models.ComputeClusterUser
err := s.db.SelectContext(ctx, &users,
diff --git a/internal/store/store.go b/internal/store/store.go
index b61f37ecf..385bfb927 100644
--- a/internal/store/store.go
+++ b/internal/store/store.go
@@ -80,6 +80,8 @@ type ComputeClusterUserStore interface {
FindByID(ctx context.Context, id string) (*models.ComputeClusterUser,
error)
// FindByPair returns the mapping for a (compute_cluster_id, user_id)
pair, or nil if absent.
FindByPair(ctx context.Context, clusterID, userID string)
(*models.ComputeClusterUser, error)
+ // FindByLocalUsernameAndCluster returns the mapping for a
(local_username, compute_cluster_id) pair, or nil if absent.
+ FindByLocalUsernameAndCluster(ctx context.Context, clusterID,
localUsername string) (*models.ComputeClusterUser, error)
// FindByCluster returns every user mapping for the given compute
cluster.
FindByCluster(ctx context.Context, clusterID string)
([]models.ComputeClusterUser, error)
// FindByUser returns every cluster mapping held by the given Custos
user.
@@ -158,6 +160,12 @@ type ComputeAllocationStore interface {
type ComputeAllocationResourceStore interface {
// FindByID returns the resource with the given ID, or nil if not found.
FindByID(ctx context.Context, id string)
(*models.ComputeAllocationResource, error)
+ // FindByNameAndCluster returns the resource with the given name on the
+ // given compute cluster, or nil if not found.
+ FindByNameAndCluster(ctx context.Context, name, clusterID string)
(*models.ComputeAllocationResource, error)
+ // FindByTypeAndCluster returns all resources of the given type on the
+ // given compute cluster.
+ FindByTypeAndCluster(ctx context.Context, resourceType, clusterID
string) ([]models.ComputeAllocationResource, error)
// List returns all compute allocation resources.
List(ctx context.Context) ([]models.ComputeAllocationResource, error)
// Create inserts a new resource within the provided transaction.
@@ -390,6 +398,9 @@ type ComputeAllocationUsageStore interface {
// FindByUser returns every usage event attributed to the given user,
// ordered by calculated_time ascending.
FindByUser(ctx context.Context, userID string)
([]models.ComputeAllocationUsage, error)
+ // FindByComputeAllocationIDAndJobID returns the usage event for the
given
+ // compute allocation ID and job ID, or nil if it does not exist.
+ FindByComputeAllocationIDAndJobID(ctx context.Context, allocationID,
jobID string) (*models.ComputeAllocationUsage, error)
// SumSUForAllocation returns the total SUs consumed against the given
// allocation across all usage events.
SumSUForAllocation(ctx context.Context, allocationID string) (int64,
error)
diff --git a/pkg/models/allocation.go b/pkg/models/allocation.go
index a8fcd3976..b372d2557 100644
--- a/pkg/models/allocation.go
+++ b/pkg/models/allocation.go
@@ -35,10 +35,11 @@ type ComputeAllocation struct {
// Typically store the a paritition information
type ComputeAllocationResource struct {
- ID string `json:"id" db:"id"`
- Name string `json:"name" db:"name"` //
resource / partition name, e.g., "cpu-01", "gpu-01", "gpu-interactive", etc.
- ResourceType string `json:"resource_type" db:"resource_type"` //
cpu, gpu
- ResourceAmount int64 `json:"resource_amount" db:"resource_amount"` //
Number of CPUs, GPUs.
+ ID string `json:"id" db:"id"`
+ Name string `json:"name" db:"name"`
// resource / partition name, e.g., "cpu-01", "gpu-01", "gpu-interactive",
etc.
+ ResourceType string `json:"resource_type" db:"resource_type"`
// TRES: cpu, gres/gpu
+ ResourceAmount int64 `json:"resource_amount"
db:"resource_amount"` // Number of CPUs, GPUs.
+ ComputeClusterID string `json:"compute_cluster_id"
db:"compute_cluster_id"` // The ID of the compute cluster the resource
(partition) belongs to.
}
// Store the association amount for a parition and allocation
@@ -91,8 +92,8 @@ type ComputeAllocationChangeRequestEvent struct {
type ComputeAllocationUsage struct { // Represents the usage of a compute
allocation, e.g., when a job consumes some of the allocated SUs, etc.
ID string `json:"id"
db:"id"`
ComputeAllocationID string `json:"compute_allocation_id"
db:"compute_allocation_id"`
- UsedRawAmount int64 `json:"used_raw_amount"
db:"used_raw_amount"` // The raw amount of resource used,
e.g., 20 CPU hours, 10 GPU hours, etc.
- UsedSUAmount int64 `json:"used_su_amount"
db:"used_su_amount"` // SUs used by the allocation, e.g.,
200 SUs, etc.
+ UsedRawAmount float64 `json:"used_raw_amount"
db:"used_raw_amount"` // The raw amount of resource used,
e.g., 20 CPU hours, 10 GPU hours, etc.
+ UsedSUAmount float64 `json:"used_su_amount"
db:"used_su_amount"` // SUs used by the allocation, e.g.,
200 SUs, etc.
CalculatedTime time.Time `json:"last_updated"
db:"calculated_time"` // The last time the usage was
updated. SU should be calculated up to this point in time and charge rates
should be applied based on the rates effective at this time.
UserID string `json:"user_id"
db:"user_id"` // The ID of the user who used the
allocation.
JobID string `json:"job_id"
db:"job_id"` // The ID of the job that consumed
the allocation.
diff --git a/pkg/service/compute_allocation_resource.go
b/pkg/service/compute_allocation_resource.go
index 4efb28b51..70effcf44 100644
--- a/pkg/service/compute_allocation_resource.go
+++ b/pkg/service/compute_allocation_resource.go
@@ -38,6 +38,14 @@ func (s *Service) CreateComputeAllocationResource(ctx
context.Context, resource
if resource.ResourceType == "" {
return nil, fmt.Errorf("%w: resource_type is required",
ErrInvalidInput)
}
+ if resource.ComputeClusterID == "" {
+ return nil, fmt.Errorf("%w: compute_cluster_id is required",
ErrInvalidInput)
+ }
+ if cluster, err := s.clusters.FindByID(ctx, resource.ComputeClusterID);
err != nil {
+ return nil, fmt.Errorf("lookup compute cluster: %w", err)
+ } else if cluster == nil {
+ return nil, fmt.Errorf("%w: compute cluster %q not found",
ErrInvalidInput, resource.ComputeClusterID)
+ }
if resource.ID == "" {
resource.ID = newID()
}
@@ -65,6 +73,26 @@ func (s *Service) GetComputeAllocationResource(ctx
context.Context, id string) (
return r, nil
}
+// GetComputeAllocationResourceByNameAndCluster retrieves a compute allocation
+// resource by its name within a given compute cluster. Returns ErrNotFound
+// when no resource matches.
+func (s *Service) GetComputeAllocationResourceByNameAndCluster(ctx
context.Context, name, clusterID string) (*models.ComputeAllocationResource,
error) {
+ if name == "" {
+ return nil, fmt.Errorf("%w: resource name is required",
ErrInvalidInput)
+ }
+ if clusterID == "" {
+ return nil, fmt.Errorf("%w: compute_cluster_id is required",
ErrInvalidInput)
+ }
+ r, err := s.resources.FindByNameAndCluster(ctx, name, clusterID)
+ if err != nil {
+ return nil, fmt.Errorf("get compute allocation resource by name
and cluster: %w", err)
+ }
+ if r == nil {
+ return nil, ErrNotFound
+ }
+ return r, nil
+}
+
// ListComputeAllocationResources returns every compute allocation resource.
func (s *Service) ListComputeAllocationResources(ctx context.Context)
([]models.ComputeAllocationResource, error) {
resources, err := s.resources.List(ctx)
@@ -74,6 +102,22 @@ func (s *Service) ListComputeAllocationResources(ctx
context.Context) ([]models.
return resources, nil
}
+// ListComputeAllocationResourcesByTypeAndCluster returns all resources of the
+// given type on the given compute cluster.
+func (s *Service) ListComputeAllocationResourcesByTypeAndCluster(ctx
context.Context, resourceType, clusterID string)
([]models.ComputeAllocationResource, error) {
+ if resourceType == "" {
+ return nil, fmt.Errorf("%w: resource_type is required",
ErrInvalidInput)
+ }
+ if clusterID == "" {
+ return nil, fmt.Errorf("%w: compute_cluster_id is required",
ErrInvalidInput)
+ }
+ resources, err := s.resources.FindByTypeAndCluster(ctx, resourceType,
clusterID)
+ if err != nil {
+ return nil, fmt.Errorf("list compute allocation resources by
type and cluster: %w", err)
+ }
+ return resources, nil
+}
+
// UpdateComputeAllocationResource persists changes to an existing resource.
func (s *Service) UpdateComputeAllocationResource(ctx context.Context,
resource *models.ComputeAllocationResource) error {
if resource == nil || resource.ID == "" {
diff --git a/pkg/service/compute_allocation_usage.go
b/pkg/service/compute_allocation_usage.go
index 38e9900bc..1259aa438 100644
--- a/pkg/service/compute_allocation_usage.go
+++ b/pkg/service/compute_allocation_usage.go
@@ -81,6 +81,25 @@ func (s *Service) GetComputeAllocationUsage(ctx
context.Context, id string) (*mo
return u, nil
}
+// GetComputeAllocationUsageByComputeAllocationIDAndJobID retrieves a usage
event
+// by the given compute allocation ID and job ID. Returns ErrNotFound when no
usage matches.
+func (s *Service) GetComputeAllocationUsageByComputeAllocationIDAndJobID(ctx
context.Context, allocationID, jobID string) (*models.ComputeAllocationUsage,
error) {
+ if allocationID == "" {
+ return nil, fmt.Errorf("%w: compute_allocation_id is required",
ErrInvalidInput)
+ }
+ if jobID == "" {
+ return nil, fmt.Errorf("%w: job_id is required",
ErrInvalidInput)
+ }
+ u, err := s.usages.FindByComputeAllocationIDAndJobID(ctx, allocationID,
jobID)
+ if err != nil {
+ return nil, fmt.Errorf("get compute allocation usage by
allocation id and job id: %w", err)
+ }
+ if u == nil {
+ return nil, ErrNotFound
+ }
+ return u, nil
+}
+
// ListUsagesForAllocation returns every usage event recorded against the
// given allocation, ordered by calculated_time ascending.
func (s *Service) ListUsagesForAllocation(ctx context.Context, allocationID
string) ([]models.ComputeAllocationUsage, error) {
diff --git a/pkg/service/compute_cluster_user.go
b/pkg/service/compute_cluster_user.go
index 0aeca1f27..33489ed9a 100644
--- a/pkg/service/compute_cluster_user.go
+++ b/pkg/service/compute_cluster_user.go
@@ -122,6 +122,25 @@ func (s *Service) GetComputeClusterUserByPair(ctx
context.Context, clusterID, us
return c, nil
}
+// GetComputeClusterUserByLocalUsernameAndCluster retrieves the compute-cluster
+// user mapping for the given (local_username, compute_cluster_id) pair.
+func (s *Service) GetComputeClusterUserByLocalUsernameAndCluster(ctx
context.Context, clusterID, localUsername string) (*models.ComputeClusterUser,
error) {
+ if clusterID == "" {
+ return nil, fmt.Errorf("%w: compute_cluster_id is required",
ErrInvalidInput)
+ }
+ if localUsername == "" {
+ return nil, fmt.Errorf("%w: local_username is required",
ErrInvalidInput)
+ }
+ c, err := s.clusterUsers.FindByLocalUsernameAndCluster(ctx, clusterID,
localUsername)
+ if err != nil {
+ return nil, fmt.Errorf("get compute cluster user by local
username and cluster: %w", err)
+ }
+ if c == nil {
+ return nil, ErrNotFound
+ }
+ return c, nil
+}
+
// ListComputeClusterUsersByCluster returns every user mapping for the given
// compute cluster, ordered by local username.
func (s *Service) ListComputeClusterUsersByCluster(ctx context.Context,
clusterID string) ([]models.ComputeClusterUser, error) {
diff --git a/pkg/service/interface.go b/pkg/service/interface.go
index 1a36e70a3..d76736a06 100644
--- a/pkg/service/interface.go
+++ b/pkg/service/interface.go
@@ -98,6 +98,7 @@ type ComputeClusterUserService interface {
CreateComputeClusterUser(ctx context.Context, cu
*models.ComputeClusterUser) (*models.ComputeClusterUser, error)
GetComputeClusterUser(ctx context.Context, id string)
(*models.ComputeClusterUser, error)
GetComputeClusterUserByPair(ctx context.Context, clusterID, userID
string) (*models.ComputeClusterUser, error)
+ GetComputeClusterUserByLocalUsernameAndCluster(ctx context.Context,
clusterID, localUsername string) (*models.ComputeClusterUser, error)
ListComputeClusterUsersByCluster(ctx context.Context, clusterID string)
([]models.ComputeClusterUser, error)
ListComputeClusterUsersByUser(ctx context.Context, userID string)
([]models.ComputeClusterUser, error)
UpdateComputeClusterUser(ctx context.Context, cu
*models.ComputeClusterUser) error
@@ -118,7 +119,9 @@ type ComputeAllocationService interface {
type ComputeAllocationResourceService interface {
CreateComputeAllocationResource(ctx context.Context, resource
*models.ComputeAllocationResource) (*models.ComputeAllocationResource, error)
GetComputeAllocationResource(ctx context.Context, id string)
(*models.ComputeAllocationResource, error)
+ GetComputeAllocationResourceByNameAndCluster(ctx context.Context, name,
clusterID string) (*models.ComputeAllocationResource, error)
ListComputeAllocationResources(ctx context.Context)
([]models.ComputeAllocationResource, error)
+ ListComputeAllocationResourcesByTypeAndCluster(ctx context.Context,
resourceType, clusterID string) ([]models.ComputeAllocationResource, error)
UpdateComputeAllocationResource(ctx context.Context, resource
*models.ComputeAllocationResource) error
DeleteComputeAllocationResource(ctx context.Context, id string) error
}
@@ -197,6 +200,7 @@ type ComputeAllocationMembershipResourceOverrideService
interface {
type ComputeAllocationUsageService interface {
CreateComputeAllocationUsage(ctx context.Context, u
*models.ComputeAllocationUsage) (*models.ComputeAllocationUsage, error)
GetComputeAllocationUsage(ctx context.Context, id string)
(*models.ComputeAllocationUsage, error)
+ GetComputeAllocationUsageByComputeAllocationIDAndJobID(ctx
context.Context, allocationID, jobID string) (*models.ComputeAllocationUsage,
error)
ListUsagesForAllocation(ctx context.Context, allocationID string)
([]models.ComputeAllocationUsage, error)
ListUsagesByUser(ctx context.Context, userID string)
([]models.ComputeAllocationUsage, error)
GetTotalSUUsageForAllocation(ctx context.Context, allocationID string)
(int64, error)
diff --git a/pkg/service/mock.go b/pkg/service/mock.go
index b1420d626..99b2c7dc4 100644
--- a/pkg/service/mock.go
+++ b/pkg/service/mock.go
@@ -164,12 +164,18 @@ var _ CoreService = &CoreServiceMock{}
// GetComputeAllocationResourceFunc: func(ctx
context.Context, id string) (*models.ComputeAllocationResource, error) {
// panic("mock out the
GetComputeAllocationResource method")
// },
+// GetComputeAllocationResourceByNameAndClusterFunc:
func(ctx context.Context, name string, clusterID string)
(*models.ComputeAllocationResource, error) {
+// panic("mock out the
GetComputeAllocationResourceByNameAndCluster method")
+// },
// GetComputeAllocationResourceRateFunc: func(ctx
context.Context, id string) (*models.ComputeAllocationResourceRate, error) {
// panic("mock out the
GetComputeAllocationResourceRate method")
// },
// GetComputeAllocationUsageFunc: func(ctx
context.Context, id string) (*models.ComputeAllocationUsage, error) {
// panic("mock out the GetComputeAllocationUsage
method")
// },
+//
GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc: func(ctx
context.Context, allocationID string, jobID string)
(*models.ComputeAllocationUsage, error) {
+// panic("mock out the
GetComputeAllocationUsageByComputeAllocationIDAndJobID method")
+// },
// GetComputeClusterFunc: func(ctx context.Context, id
string) (*models.ComputeCluster, error) {
// panic("mock out the GetComputeCluster method")
// },
@@ -179,6 +185,9 @@ var _ CoreService = &CoreServiceMock{}
// GetComputeClusterUserFunc: func(ctx context.Context, id
string) (*models.ComputeClusterUser, error) {
// panic("mock out the GetComputeClusterUser
method")
// },
+// GetComputeClusterUserByLocalUsernameAndClusterFunc:
func(ctx context.Context, clusterID string, localUsername string)
(*models.ComputeClusterUser, error) {
+// panic("mock out the
GetComputeClusterUserByLocalUsernameAndCluster method")
+// },
// GetComputeClusterUserByPairFunc: func(ctx
context.Context, clusterID string, userID string) (*models.ComputeClusterUser,
error) {
// panic("mock out the GetComputeClusterUserByPair
method")
// },
@@ -263,6 +272,9 @@ var _ CoreService = &CoreServiceMock{}
// ListComputeAllocationResourcesFunc: func(ctx
context.Context) ([]models.ComputeAllocationResource, error) {
// panic("mock out the
ListComputeAllocationResources method")
// },
+// ListComputeAllocationResourcesByTypeAndClusterFunc:
func(ctx context.Context, resourceType string, clusterID string)
([]models.ComputeAllocationResource, error) {
+// panic("mock out the
ListComputeAllocationResourcesByTypeAndCluster method")
+// },
// ListComputeAllocationsByClusterFunc: func(ctx
context.Context, clusterID string) ([]models.ComputeAllocation, error) {
// panic("mock out the
ListComputeAllocationsByCluster method")
// },
@@ -549,12 +561,18 @@ type CoreServiceMock struct {
// GetComputeAllocationResourceFunc mocks the
GetComputeAllocationResource method.
GetComputeAllocationResourceFunc func(ctx context.Context, id string)
(*models.ComputeAllocationResource, error)
+ // GetComputeAllocationResourceByNameAndClusterFunc mocks the
GetComputeAllocationResourceByNameAndCluster method.
+ GetComputeAllocationResourceByNameAndClusterFunc func(ctx
context.Context, name string, clusterID string)
(*models.ComputeAllocationResource, error)
+
// GetComputeAllocationResourceRateFunc mocks the
GetComputeAllocationResourceRate method.
GetComputeAllocationResourceRateFunc func(ctx context.Context, id
string) (*models.ComputeAllocationResourceRate, error)
// GetComputeAllocationUsageFunc mocks the GetComputeAllocationUsage
method.
GetComputeAllocationUsageFunc func(ctx context.Context, id string)
(*models.ComputeAllocationUsage, error)
+ // GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc mocks the
GetComputeAllocationUsageByComputeAllocationIDAndJobID method.
+ GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc func(ctx
context.Context, allocationID string, jobID string)
(*models.ComputeAllocationUsage, error)
+
// GetComputeClusterFunc mocks the GetComputeCluster method.
GetComputeClusterFunc func(ctx context.Context, id string)
(*models.ComputeCluster, error)
@@ -564,6 +582,9 @@ type CoreServiceMock struct {
// GetComputeClusterUserFunc mocks the GetComputeClusterUser method.
GetComputeClusterUserFunc func(ctx context.Context, id string)
(*models.ComputeClusterUser, error)
+ // GetComputeClusterUserByLocalUsernameAndClusterFunc mocks the
GetComputeClusterUserByLocalUsernameAndCluster method.
+ GetComputeClusterUserByLocalUsernameAndClusterFunc func(ctx
context.Context, clusterID string, localUsername string)
(*models.ComputeClusterUser, error)
+
// GetComputeClusterUserByPairFunc mocks the
GetComputeClusterUserByPair method.
GetComputeClusterUserByPairFunc func(ctx context.Context, clusterID
string, userID string) (*models.ComputeClusterUser, error)
@@ -648,6 +669,9 @@ type CoreServiceMock struct {
// ListComputeAllocationResourcesFunc mocks the
ListComputeAllocationResources method.
ListComputeAllocationResourcesFunc func(ctx context.Context)
([]models.ComputeAllocationResource, error)
+ // ListComputeAllocationResourcesByTypeAndClusterFunc mocks the
ListComputeAllocationResourcesByTypeAndCluster method.
+ ListComputeAllocationResourcesByTypeAndClusterFunc func(ctx
context.Context, resourceType string, clusterID string)
([]models.ComputeAllocationResource, error)
+
// ListComputeAllocationsByClusterFunc mocks the
ListComputeAllocationsByCluster method.
ListComputeAllocationsByClusterFunc func(ctx context.Context, clusterID
string) ([]models.ComputeAllocation, error)
@@ -1143,6 +1167,15 @@ type CoreServiceMock struct {
// ID is the id argument value.
ID string
}
+ // GetComputeAllocationResourceByNameAndCluster holds details
about calls to the GetComputeAllocationResourceByNameAndCluster method.
+ GetComputeAllocationResourceByNameAndCluster []struct {
+ // Ctx is the ctx argument value.
+ Ctx context.Context
+ // Name is the name argument value.
+ Name string
+ // ClusterID is the clusterID argument value.
+ ClusterID string
+ }
// GetComputeAllocationResourceRate holds details about calls
to the GetComputeAllocationResourceRate method.
GetComputeAllocationResourceRate []struct {
// Ctx is the ctx argument value.
@@ -1157,6 +1190,15 @@ type CoreServiceMock struct {
// ID is the id argument value.
ID string
}
+ // GetComputeAllocationUsageByComputeAllocationIDAndJobID holds
details about calls to the
GetComputeAllocationUsageByComputeAllocationIDAndJobID method.
+ GetComputeAllocationUsageByComputeAllocationIDAndJobID []struct
{
+ // Ctx is the ctx argument value.
+ Ctx context.Context
+ // AllocationID is the allocationID argument value.
+ AllocationID string
+ // JobID is the jobID argument value.
+ JobID string
+ }
// GetComputeCluster holds details about calls to the
GetComputeCluster method.
GetComputeCluster []struct {
// Ctx is the ctx argument value.
@@ -1178,6 +1220,15 @@ type CoreServiceMock struct {
// ID is the id argument value.
ID string
}
+ // GetComputeClusterUserByLocalUsernameAndCluster holds details
about calls to the GetComputeClusterUserByLocalUsernameAndCluster method.
+ GetComputeClusterUserByLocalUsernameAndCluster []struct {
+ // Ctx is the ctx argument value.
+ Ctx context.Context
+ // ClusterID is the clusterID argument value.
+ ClusterID string
+ // LocalUsername is the localUsername argument value.
+ LocalUsername string
+ }
// GetComputeClusterUserByPair holds details about calls to the
GetComputeClusterUserByPair method.
GetComputeClusterUserByPair []struct {
// Ctx is the ctx argument value.
@@ -1394,6 +1445,15 @@ type CoreServiceMock struct {
// Ctx is the ctx argument value.
Ctx context.Context
}
+ // ListComputeAllocationResourcesByTypeAndCluster holds details
about calls to the ListComputeAllocationResourcesByTypeAndCluster method.
+ ListComputeAllocationResourcesByTypeAndCluster []struct {
+ // Ctx is the ctx argument value.
+ Ctx context.Context
+ // ResourceType is the resourceType argument value.
+ ResourceType string
+ // ClusterID is the clusterID argument value.
+ ClusterID string
+ }
// ListComputeAllocationsByCluster holds details about calls to
the ListComputeAllocationsByCluster method.
ListComputeAllocationsByCluster []struct {
// Ctx is the ctx argument value.
@@ -1738,132 +1798,136 @@ type CoreServiceMock struct {
Status models.UserStatus
}
}
- lockAddPrivilegeToRole sync.RWMutex
- lockAttachResourceToAllocation sync.RWMutex
- lockBootstrapSuperAdmin sync.RWMutex
- lockCreateAuditEvent sync.RWMutex
- lockCreateComputeAllocation sync.RWMutex
- lockCreateComputeAllocationChangeRequest sync.RWMutex
- lockCreateComputeAllocationChangeRequestEvent sync.RWMutex
- lockCreateComputeAllocationDiff sync.RWMutex
- lockCreateComputeAllocationMembership sync.RWMutex
- lockCreateComputeAllocationMembershipResourceOverride sync.RWMutex
- lockCreateComputeAllocationResource sync.RWMutex
- lockCreateComputeAllocationResourceRate sync.RWMutex
- lockCreateComputeAllocationUsage sync.RWMutex
- lockCreateComputeCluster sync.RWMutex
- lockCreateComputeClusterUser sync.RWMutex
- lockCreateOrganization sync.RWMutex
- lockCreateProject sync.RWMutex
- lockCreateRole sync.RWMutex
- lockCreateUser sync.RWMutex
- lockCreateUserIdentity sync.RWMutex
- lockDeleteAuditEvent sync.RWMutex
- lockDeleteComputeAllocation sync.RWMutex
- lockDeleteComputeAllocationChangeRequest sync.RWMutex
- lockDeleteComputeAllocationChangeRequestEvent sync.RWMutex
- lockDeleteComputeAllocationDiff sync.RWMutex
- lockDeleteComputeAllocationMembership sync.RWMutex
- lockDeleteComputeAllocationMembershipResourceOverride sync.RWMutex
- lockDeleteComputeAllocationResource sync.RWMutex
- lockDeleteComputeAllocationResourceRate sync.RWMutex
- lockDeleteComputeAllocationUsage sync.RWMutex
- lockDeleteComputeCluster sync.RWMutex
- lockDeleteComputeClusterUser sync.RWMutex
- lockDeleteOrganization sync.RWMutex
- lockDeleteProject sync.RWMutex
- lockDeleteRole sync.RWMutex
- lockDeleteUser sync.RWMutex
- lockDeleteUserIdentity sync.RWMutex
- lockDetachResourceFromAllocation sync.RWMutex
- lockEffectivePrivileges sync.RWMutex
- lockGetAuditEvent sync.RWMutex
- lockGetComputeAllocation sync.RWMutex
- lockGetComputeAllocationChangeRequest sync.RWMutex
- lockGetComputeAllocationChangeRequestEvent sync.RWMutex
- lockGetComputeAllocationDiff sync.RWMutex
- lockGetComputeAllocationMembership sync.RWMutex
- lockGetComputeAllocationMembershipResourceOverride sync.RWMutex
- lockGetComputeAllocationMembershipResourceOverrideByPair sync.RWMutex
- lockGetComputeAllocationResource sync.RWMutex
- lockGetComputeAllocationResourceRate sync.RWMutex
- lockGetComputeAllocationUsage sync.RWMutex
- lockGetComputeCluster sync.RWMutex
- lockGetComputeClusterByName sync.RWMutex
- lockGetComputeClusterUser sync.RWMutex
- lockGetComputeClusterUserByPair sync.RWMutex
- lockGetEffectiveRateForResource sync.RWMutex
- lockGetLatestDiffForAllocation sync.RWMutex
- lockGetLatestEventForChangeRequest sync.RWMutex
- lockGetOrganization sync.RWMutex
- lockGetOrganizationByOriginatedID sync.RWMutex
- lockGetProject sync.RWMutex
- lockGetProjectByOriginatedID sync.RWMutex
- lockGetRole sync.RWMutex
- lockGetTotalSUUsageForAllocation sync.RWMutex
- lockGetTotalSUUsageForUserInAllocation sync.RWMutex
- lockGetUser sync.RWMutex
- lockGetUserByEmail sync.RWMutex
- lockGetUserByUserIdentity sync.RWMutex
- lockGetUserIdentity sync.RWMutex
- lockGetUserIdentityByOIDCSub sync.RWMutex
- lockGetUserIdentityBySourceAndExternalID sync.RWMutex
- lockGrantPrivilege sync.RWMutex
- lockGrantRoleToUser sync.RWMutex
- lockHasPrivilege sync.RWMutex
- lockListAllAuditEvents sync.RWMutex
- lockListAllocationsForResource sync.RWMutex
- lockListAllocationsForUser sync.RWMutex
- lockListAuditEventsByEntity sync.RWMutex
- lockListAuditEventsByEventType sync.RWMutex
- lockListChangeRequestsByRequester sync.RWMutex
- lockListChangeRequestsForAllocation sync.RWMutex
- lockListComputeAllocationResources sync.RWMutex
- lockListComputeAllocationsByCluster sync.RWMutex
- lockListComputeAllocationsByProject sync.RWMutex
- lockListComputeClusterUsersByCluster sync.RWMutex
- lockListComputeClusterUsersByUser sync.RWMutex
- lockListComputeClusters sync.RWMutex
- lockListDiffsForAllocation sync.RWMutex
- lockListEventsForChangeRequest sync.RWMutex
- lockListMembersForAllocation sync.RWMutex
- lockListOverridesForMembership sync.RWMutex
- lockListOverridesForResource sync.RWMutex
- lockListPrivilegeHolders sync.RWMutex
- lockListProjectsByPI sync.RWMutex
- lockListRatesForResource sync.RWMutex
- lockListResourcesForAllocation sync.RWMutex
- lockListRoleHolders sync.RWMutex
- lockListRolePrivileges sync.RWMutex
- lockListRoles sync.RWMutex
- lockListUsagesByUser sync.RWMutex
- lockListUsagesForAllocation sync.RWMutex
- lockListUserIdentitiesForUser sync.RWMutex
- lockListUserPrivileges sync.RWMutex
- lockListUserRoles sync.RWMutex
- lockListUsersByOrganization sync.RWMutex
- lockMergeUsers sync.RWMutex
- lockPrivilegeCatalog sync.RWMutex
- lockRemovePrivilegeFromRole sync.RWMutex
- lockRevokePrivilege sync.RWMutex
- lockRevokeRoleFromUser sync.RWMutex
- lockUpdateAllocationResourceMapping sync.RWMutex
- lockUpdateComputeAllocation sync.RWMutex
- lockUpdateComputeAllocationChangeRequest sync.RWMutex
- lockUpdateComputeAllocationMembership sync.RWMutex
- lockUpdateComputeAllocationMembershipResourceOverride sync.RWMutex
- lockUpdateComputeAllocationResource sync.RWMutex
- lockUpdateComputeAllocationResourceRate sync.RWMutex
- lockUpdateComputeCluster sync.RWMutex
- lockUpdateComputeClusterUser sync.RWMutex
- lockUpdateMembershipStatus sync.RWMutex
- lockUpdateOrganization sync.RWMutex
- lockUpdateProject sync.RWMutex
- lockUpdateProjectStatus sync.RWMutex
- lockUpdateRole sync.RWMutex
- lockUpdateUser sync.RWMutex
- lockUpdateUserIdentity sync.RWMutex
- lockUpdateUserStatus sync.RWMutex
+ lockAddPrivilegeToRole sync.RWMutex
+ lockAttachResourceToAllocation sync.RWMutex
+ lockBootstrapSuperAdmin sync.RWMutex
+ lockCreateAuditEvent sync.RWMutex
+ lockCreateComputeAllocation sync.RWMutex
+ lockCreateComputeAllocationChangeRequest sync.RWMutex
+ lockCreateComputeAllocationChangeRequestEvent sync.RWMutex
+ lockCreateComputeAllocationDiff sync.RWMutex
+ lockCreateComputeAllocationMembership sync.RWMutex
+ lockCreateComputeAllocationMembershipResourceOverride sync.RWMutex
+ lockCreateComputeAllocationResource sync.RWMutex
+ lockCreateComputeAllocationResourceRate sync.RWMutex
+ lockCreateComputeAllocationUsage sync.RWMutex
+ lockCreateComputeCluster sync.RWMutex
+ lockCreateComputeClusterUser sync.RWMutex
+ lockCreateOrganization sync.RWMutex
+ lockCreateProject sync.RWMutex
+ lockCreateRole sync.RWMutex
+ lockCreateUser sync.RWMutex
+ lockCreateUserIdentity sync.RWMutex
+ lockDeleteAuditEvent sync.RWMutex
+ lockDeleteComputeAllocation sync.RWMutex
+ lockDeleteComputeAllocationChangeRequest sync.RWMutex
+ lockDeleteComputeAllocationChangeRequestEvent sync.RWMutex
+ lockDeleteComputeAllocationDiff sync.RWMutex
+ lockDeleteComputeAllocationMembership sync.RWMutex
+ lockDeleteComputeAllocationMembershipResourceOverride sync.RWMutex
+ lockDeleteComputeAllocationResource sync.RWMutex
+ lockDeleteComputeAllocationResourceRate sync.RWMutex
+ lockDeleteComputeAllocationUsage sync.RWMutex
+ lockDeleteComputeCluster sync.RWMutex
+ lockDeleteComputeClusterUser sync.RWMutex
+ lockDeleteOrganization sync.RWMutex
+ lockDeleteProject sync.RWMutex
+ lockDeleteRole sync.RWMutex
+ lockDeleteUser sync.RWMutex
+ lockDeleteUserIdentity sync.RWMutex
+ lockDetachResourceFromAllocation sync.RWMutex
+ lockEffectivePrivileges sync.RWMutex
+ lockGetAuditEvent sync.RWMutex
+ lockGetComputeAllocation sync.RWMutex
+ lockGetComputeAllocationChangeRequest sync.RWMutex
+ lockGetComputeAllocationChangeRequestEvent sync.RWMutex
+ lockGetComputeAllocationDiff sync.RWMutex
+ lockGetComputeAllocationMembership sync.RWMutex
+ lockGetComputeAllocationMembershipResourceOverride sync.RWMutex
+ lockGetComputeAllocationMembershipResourceOverrideByPair sync.RWMutex
+ lockGetComputeAllocationResource sync.RWMutex
+ lockGetComputeAllocationResourceByNameAndCluster sync.RWMutex
+ lockGetComputeAllocationResourceRate sync.RWMutex
+ lockGetComputeAllocationUsage sync.RWMutex
+ lockGetComputeAllocationUsageByComputeAllocationIDAndJobID sync.RWMutex
+ lockGetComputeCluster sync.RWMutex
+ lockGetComputeClusterByName sync.RWMutex
+ lockGetComputeClusterUser sync.RWMutex
+ lockGetComputeClusterUserByLocalUsernameAndCluster sync.RWMutex
+ lockGetComputeClusterUserByPair sync.RWMutex
+ lockGetEffectiveRateForResource sync.RWMutex
+ lockGetLatestDiffForAllocation sync.RWMutex
+ lockGetLatestEventForChangeRequest sync.RWMutex
+ lockGetOrganization sync.RWMutex
+ lockGetOrganizationByOriginatedID sync.RWMutex
+ lockGetProject sync.RWMutex
+ lockGetProjectByOriginatedID sync.RWMutex
+ lockGetRole sync.RWMutex
+ lockGetTotalSUUsageForAllocation sync.RWMutex
+ lockGetTotalSUUsageForUserInAllocation sync.RWMutex
+ lockGetUser sync.RWMutex
+ lockGetUserByEmail sync.RWMutex
+ lockGetUserByUserIdentity sync.RWMutex
+ lockGetUserIdentity sync.RWMutex
+ lockGetUserIdentityByOIDCSub sync.RWMutex
+ lockGetUserIdentityBySourceAndExternalID sync.RWMutex
+ lockGrantPrivilege sync.RWMutex
+ lockGrantRoleToUser sync.RWMutex
+ lockHasPrivilege sync.RWMutex
+ lockListAllAuditEvents sync.RWMutex
+ lockListAllocationsForResource sync.RWMutex
+ lockListAllocationsForUser sync.RWMutex
+ lockListAuditEventsByEntity sync.RWMutex
+ lockListAuditEventsByEventType sync.RWMutex
+ lockListChangeRequestsByRequester sync.RWMutex
+ lockListChangeRequestsForAllocation sync.RWMutex
+ lockListComputeAllocationResources sync.RWMutex
+ lockListComputeAllocationResourcesByTypeAndCluster sync.RWMutex
+ lockListComputeAllocationsByCluster sync.RWMutex
+ lockListComputeAllocationsByProject sync.RWMutex
+ lockListComputeClusterUsersByCluster sync.RWMutex
+ lockListComputeClusterUsersByUser sync.RWMutex
+ lockListComputeClusters sync.RWMutex
+ lockListDiffsForAllocation sync.RWMutex
+ lockListEventsForChangeRequest sync.RWMutex
+ lockListMembersForAllocation sync.RWMutex
+ lockListOverridesForMembership sync.RWMutex
+ lockListOverridesForResource sync.RWMutex
+ lockListPrivilegeHolders sync.RWMutex
+ lockListProjectsByPI sync.RWMutex
+ lockListRatesForResource sync.RWMutex
+ lockListResourcesForAllocation sync.RWMutex
+ lockListRoleHolders sync.RWMutex
+ lockListRolePrivileges sync.RWMutex
+ lockListRoles sync.RWMutex
+ lockListUsagesByUser sync.RWMutex
+ lockListUsagesForAllocation sync.RWMutex
+ lockListUserIdentitiesForUser sync.RWMutex
+ lockListUserPrivileges sync.RWMutex
+ lockListUserRoles sync.RWMutex
+ lockListUsersByOrganization sync.RWMutex
+ lockMergeUsers sync.RWMutex
+ lockPrivilegeCatalog sync.RWMutex
+ lockRemovePrivilegeFromRole sync.RWMutex
+ lockRevokePrivilege sync.RWMutex
+ lockRevokeRoleFromUser sync.RWMutex
+ lockUpdateAllocationResourceMapping sync.RWMutex
+ lockUpdateComputeAllocation sync.RWMutex
+ lockUpdateComputeAllocationChangeRequest sync.RWMutex
+ lockUpdateComputeAllocationMembership sync.RWMutex
+ lockUpdateComputeAllocationMembershipResourceOverride sync.RWMutex
+ lockUpdateComputeAllocationResource sync.RWMutex
+ lockUpdateComputeAllocationResourceRate sync.RWMutex
+ lockUpdateComputeCluster sync.RWMutex
+ lockUpdateComputeClusterUser sync.RWMutex
+ lockUpdateMembershipStatus sync.RWMutex
+ lockUpdateOrganization sync.RWMutex
+ lockUpdateProject sync.RWMutex
+ lockUpdateProjectStatus sync.RWMutex
+ lockUpdateRole sync.RWMutex
+ lockUpdateUser sync.RWMutex
+ lockUpdateUserIdentity sync.RWMutex
+ lockUpdateUserStatus sync.RWMutex
}
// AddPrivilegeToRole calls AddPrivilegeToRoleFunc.
@@ -3638,6 +3702,46 @@ func (mock *CoreServiceMock)
GetComputeAllocationResourceCalls() []struct {
return calls
}
+// GetComputeAllocationResourceByNameAndCluster calls
GetComputeAllocationResourceByNameAndClusterFunc.
+func (mock *CoreServiceMock) GetComputeAllocationResourceByNameAndCluster(ctx
context.Context, name string, clusterID string)
(*models.ComputeAllocationResource, error) {
+ if mock.GetComputeAllocationResourceByNameAndClusterFunc == nil {
+
panic("CoreServiceMock.GetComputeAllocationResourceByNameAndClusterFunc: method
is nil but CoreService.GetComputeAllocationResourceByNameAndCluster was just
called")
+ }
+ callInfo := struct {
+ Ctx context.Context
+ Name string
+ ClusterID string
+ }{
+ Ctx: ctx,
+ Name: name,
+ ClusterID: clusterID,
+ }
+ mock.lockGetComputeAllocationResourceByNameAndCluster.Lock()
+ mock.calls.GetComputeAllocationResourceByNameAndCluster =
append(mock.calls.GetComputeAllocationResourceByNameAndCluster, callInfo)
+ mock.lockGetComputeAllocationResourceByNameAndCluster.Unlock()
+ return mock.GetComputeAllocationResourceByNameAndClusterFunc(ctx, name,
clusterID)
+}
+
+// GetComputeAllocationResourceByNameAndClusterCalls gets all the calls that
were made to GetComputeAllocationResourceByNameAndCluster.
+// Check the length with:
+//
+//
len(mockedCoreService.GetComputeAllocationResourceByNameAndClusterCalls())
+func (mock *CoreServiceMock)
GetComputeAllocationResourceByNameAndClusterCalls() []struct {
+ Ctx context.Context
+ Name string
+ ClusterID string
+} {
+ var calls []struct {
+ Ctx context.Context
+ Name string
+ ClusterID string
+ }
+ mock.lockGetComputeAllocationResourceByNameAndCluster.RLock()
+ calls = mock.calls.GetComputeAllocationResourceByNameAndCluster
+ mock.lockGetComputeAllocationResourceByNameAndCluster.RUnlock()
+ return calls
+}
+
// GetComputeAllocationResourceRate calls GetComputeAllocationResourceRateFunc.
func (mock *CoreServiceMock) GetComputeAllocationResourceRate(ctx
context.Context, id string) (*models.ComputeAllocationResourceRate, error) {
if mock.GetComputeAllocationResourceRateFunc == nil {
@@ -3710,6 +3814,46 @@ func (mock *CoreServiceMock)
GetComputeAllocationUsageCalls() []struct {
return calls
}
+// GetComputeAllocationUsageByComputeAllocationIDAndJobID calls
GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc.
+func (mock *CoreServiceMock)
GetComputeAllocationUsageByComputeAllocationIDAndJobID(ctx context.Context,
allocationID string, jobID string) (*models.ComputeAllocationUsage, error) {
+ if mock.GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc ==
nil {
+
panic("CoreServiceMock.GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc:
method is nil but
CoreService.GetComputeAllocationUsageByComputeAllocationIDAndJobID was just
called")
+ }
+ callInfo := struct {
+ Ctx context.Context
+ AllocationID string
+ JobID string
+ }{
+ Ctx: ctx,
+ AllocationID: allocationID,
+ JobID: jobID,
+ }
+ mock.lockGetComputeAllocationUsageByComputeAllocationIDAndJobID.Lock()
+ mock.calls.GetComputeAllocationUsageByComputeAllocationIDAndJobID =
append(mock.calls.GetComputeAllocationUsageByComputeAllocationIDAndJobID,
callInfo)
+ mock.lockGetComputeAllocationUsageByComputeAllocationIDAndJobID.Unlock()
+ return
mock.GetComputeAllocationUsageByComputeAllocationIDAndJobIDFunc(ctx,
allocationID, jobID)
+}
+
+// GetComputeAllocationUsageByComputeAllocationIDAndJobIDCalls gets all the
calls that were made to GetComputeAllocationUsageByComputeAllocationIDAndJobID.
+// Check the length with:
+//
+//
len(mockedCoreService.GetComputeAllocationUsageByComputeAllocationIDAndJobIDCalls())
+func (mock *CoreServiceMock)
GetComputeAllocationUsageByComputeAllocationIDAndJobIDCalls() []struct {
+ Ctx context.Context
+ AllocationID string
+ JobID string
+} {
+ var calls []struct {
+ Ctx context.Context
+ AllocationID string
+ JobID string
+ }
+ mock.lockGetComputeAllocationUsageByComputeAllocationIDAndJobID.RLock()
+ calls =
mock.calls.GetComputeAllocationUsageByComputeAllocationIDAndJobID
+
mock.lockGetComputeAllocationUsageByComputeAllocationIDAndJobID.RUnlock()
+ return calls
+}
+
// GetComputeCluster calls GetComputeClusterFunc.
func (mock *CoreServiceMock) GetComputeCluster(ctx context.Context, id string)
(*models.ComputeCluster, error) {
if mock.GetComputeClusterFunc == nil {
@@ -3818,6 +3962,46 @@ func (mock *CoreServiceMock)
GetComputeClusterUserCalls() []struct {
return calls
}
+// GetComputeClusterUserByLocalUsernameAndCluster calls
GetComputeClusterUserByLocalUsernameAndClusterFunc.
+func (mock *CoreServiceMock)
GetComputeClusterUserByLocalUsernameAndCluster(ctx context.Context, clusterID
string, localUsername string) (*models.ComputeClusterUser, error) {
+ if mock.GetComputeClusterUserByLocalUsernameAndClusterFunc == nil {
+
panic("CoreServiceMock.GetComputeClusterUserByLocalUsernameAndClusterFunc:
method is nil but CoreService.GetComputeClusterUserByLocalUsernameAndCluster
was just called")
+ }
+ callInfo := struct {
+ Ctx context.Context
+ ClusterID string
+ LocalUsername string
+ }{
+ Ctx: ctx,
+ ClusterID: clusterID,
+ LocalUsername: localUsername,
+ }
+ mock.lockGetComputeClusterUserByLocalUsernameAndCluster.Lock()
+ mock.calls.GetComputeClusterUserByLocalUsernameAndCluster =
append(mock.calls.GetComputeClusterUserByLocalUsernameAndCluster, callInfo)
+ mock.lockGetComputeClusterUserByLocalUsernameAndCluster.Unlock()
+ return mock.GetComputeClusterUserByLocalUsernameAndClusterFunc(ctx,
clusterID, localUsername)
+}
+
+// GetComputeClusterUserByLocalUsernameAndClusterCalls gets all the calls that
were made to GetComputeClusterUserByLocalUsernameAndCluster.
+// Check the length with:
+//
+//
len(mockedCoreService.GetComputeClusterUserByLocalUsernameAndClusterCalls())
+func (mock *CoreServiceMock)
GetComputeClusterUserByLocalUsernameAndClusterCalls() []struct {
+ Ctx context.Context
+ ClusterID string
+ LocalUsername string
+} {
+ var calls []struct {
+ Ctx context.Context
+ ClusterID string
+ LocalUsername string
+ }
+ mock.lockGetComputeClusterUserByLocalUsernameAndCluster.RLock()
+ calls = mock.calls.GetComputeClusterUserByLocalUsernameAndCluster
+ mock.lockGetComputeClusterUserByLocalUsernameAndCluster.RUnlock()
+ return calls
+}
+
// GetComputeClusterUserByPair calls GetComputeClusterUserByPairFunc.
func (mock *CoreServiceMock) GetComputeClusterUserByPair(ctx context.Context,
clusterID string, userID string) (*models.ComputeClusterUser, error) {
if mock.GetComputeClusterUserByPairFunc == nil {
@@ -4866,6 +5050,46 @@ func (mock *CoreServiceMock)
ListComputeAllocationResourcesCalls() []struct {
return calls
}
+// ListComputeAllocationResourcesByTypeAndCluster calls
ListComputeAllocationResourcesByTypeAndClusterFunc.
+func (mock *CoreServiceMock)
ListComputeAllocationResourcesByTypeAndCluster(ctx context.Context,
resourceType string, clusterID string) ([]models.ComputeAllocationResource,
error) {
+ if mock.ListComputeAllocationResourcesByTypeAndClusterFunc == nil {
+
panic("CoreServiceMock.ListComputeAllocationResourcesByTypeAndClusterFunc:
method is nil but CoreService.ListComputeAllocationResourcesByTypeAndCluster
was just called")
+ }
+ callInfo := struct {
+ Ctx context.Context
+ ResourceType string
+ ClusterID string
+ }{
+ Ctx: ctx,
+ ResourceType: resourceType,
+ ClusterID: clusterID,
+ }
+ mock.lockListComputeAllocationResourcesByTypeAndCluster.Lock()
+ mock.calls.ListComputeAllocationResourcesByTypeAndCluster =
append(mock.calls.ListComputeAllocationResourcesByTypeAndCluster, callInfo)
+ mock.lockListComputeAllocationResourcesByTypeAndCluster.Unlock()
+ return mock.ListComputeAllocationResourcesByTypeAndClusterFunc(ctx,
resourceType, clusterID)
+}
+
+// ListComputeAllocationResourcesByTypeAndClusterCalls gets all the calls that
were made to ListComputeAllocationResourcesByTypeAndCluster.
+// Check the length with:
+//
+//
len(mockedCoreService.ListComputeAllocationResourcesByTypeAndClusterCalls())
+func (mock *CoreServiceMock)
ListComputeAllocationResourcesByTypeAndClusterCalls() []struct {
+ Ctx context.Context
+ ResourceType string
+ ClusterID string
+} {
+ var calls []struct {
+ Ctx context.Context
+ ResourceType string
+ ClusterID string
+ }
+ mock.lockListComputeAllocationResourcesByTypeAndCluster.RLock()
+ calls = mock.calls.ListComputeAllocationResourcesByTypeAndCluster
+ mock.lockListComputeAllocationResourcesByTypeAndCluster.RUnlock()
+ return calls
+}
+
// ListComputeAllocationsByCluster calls ListComputeAllocationsByClusterFunc.
func (mock *CoreServiceMock) ListComputeAllocationsByCluster(ctx
context.Context, clusterID string) ([]models.ComputeAllocation, error) {
if mock.ListComputeAllocationsByClusterFunc == nil {
diff --git a/scripts/slurm-integration-tests.sh
b/scripts/slurm-mapper-integration-tests.sh
similarity index 59%
rename from scripts/slurm-integration-tests.sh
rename to scripts/slurm-mapper-integration-tests.sh
index ac88d0fe6..90831d147 100755
--- a/scripts/slurm-integration-tests.sh
+++ b/scripts/slurm-mapper-integration-tests.sh
@@ -12,9 +12,9 @@ REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
cd "${REPO_ROOT}"
-make -s -C dev-ops/local-slurm down
-make -s -C dev-ops/local-slurm build
-make -s -C dev-ops/local-slurm up
+#make -s -C dev-ops/local-slurm down
+#make -s -C dev-ops/local-slurm build
+#make -s -C dev-ops/local-slurm up
# Mint a fresh SLURM JWT via the local-slurm Makefile target.
# `make token` prints e.g. `SLURM_JWT=eyJhbGciOi...` — strip the prefix.
@@ -31,21 +31,21 @@ export TEST_SLURM_API_VERSION="41"
export TEST_SLURM_TOKEN="${TOKEN_LINE#SLURM_JWT=}"
echo "==> TEST_SLURM_TOKEN set (${#TEST_SLURM_TOKEN} chars)"
- go test -tags integration -v -count=1 \
- ./connectors/SLURM/Association-Mapper/internal/operations/...
+# go test -tags integration -v -count=1 \
+# ./connectors/SLURM/Rest-Client/pkg/client/...
go test -tags integration -v -count=1 \
./connectors/SLURM/Association-Mapper/internal/subscribers/...
#go test -tags integration -v -count=1 \
-# ./connectors/SLURM/Association-Mapper/internal/operations/accounts.go \
-# ./connectors/SLURM/Association-Mapper/internal/operations/associations.go \
-# ./connectors/SLURM/Association-Mapper/internal/operations/client.go \
-# ./connectors/SLURM/Association-Mapper/internal/operations/tres.go \
-# ./connectors/SLURM/Association-Mapper/internal/operations/types.go \
-#
./connectors/SLURM/Association-Mapper/internal/operations/integration_common.go
\
-#
./connectors/SLURM/Association-Mapper/internal/operations/associations_integration_test.go
+# ./connectors/SLURM/Rest-Client/pkg/client/accounts.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/associations.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/client.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/tres.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/types.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/integration_common.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/associations_integration_test.go
-make -s -C dev-ops/local-slurm down
\ No newline at end of file
+#make -s -C dev-ops/local-slurm down
\ No newline at end of file
diff --git a/scripts/slurm-monitor-integration-tests.sh
b/scripts/slurm-monitor-integration-tests.sh
new file mode 100755
index 000000000..bfcfdc211
--- /dev/null
+++ b/scripts/slurm-monitor-integration-tests.sh
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+# Run the SLURM Association-Mapper integration tests.
+#
+# Usage:
+# scripts/run-integrations-tests.sh # run all integration
tests in operations/
+# scripts/run-integrations-tests.sh -run TestFoo # forward extra flags to
`go test`
+
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+REPO_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)"
+
+cd "${REPO_ROOT}"
+
+#make -s -C dev-ops/local-slurm down
+#make -s -C dev-ops/local-slurm build
+#make -s -C dev-ops/local-slurm up
+
+# Mint a fresh SLURM JWT via the local-slurm Makefile target.
+# `make token` prints e.g. `SLURM_JWT=eyJhbGciOi...` — strip the prefix.
+echo "==> minting SLURM JWT via 'make token'"
+TOKEN_LINE="$(make -s -C dev-ops/local-slurm token | grep -E '^SLURM_JWT=' |
tail -n1)"
+if [[ -z "${TOKEN_LINE}" ]]; then
+ echo "ERROR: 'make token' did not produce a SLURM_JWT=... line" >&2
+ exit 1
+fi
+
+export TEST_SLURM_TOKEN="${TOKEN_LINE#SLURM_JWT=}"
+echo "==> TEST_SLURM_TOKEN set (${#TEST_SLURM_TOKEN} chars)"
+
+TOKEN_LINE2="$(make -s -C dev-ops/local-slurm token2 | grep -E '^SLURM_JWT=' |
tail -n1)"
+if [[ -z "${TOKEN_LINE2}" ]]; then
+ echo "ERROR: 'make token2' did not produce a SLURM_JWT=... line" >&2
+ exit 1
+fi
+
+export TEST_SLURM_TOKEN2="${TOKEN_LINE2#SLURM_JWT=}"
+echo "==> TEST_SLURM_TOKEN2 set (${#TEST_SLURM_TOKEN2} chars)"
+
+
+TOKEN_LINE3="$(make -s -C dev-ops/local-slurm token3 | grep -E '^SLURM_JWT=' |
tail -n1)"
+if [[ -z "${TOKEN_LINE3}" ]]; then
+ echo "ERROR: 'make token3' did not produce a SLURM_JWT=... line" >&2
+ exit 1
+fi
+
+export TEST_SLURM_TOKEN3="${TOKEN_LINE3#SLURM_JWT=}"
+echo "==> TEST_SLURM_TOKEN3 set (${#TEST_SLURM_TOKEN3} chars)"
+
+TOKEN_LINE4="$(make -s -C dev-ops/local-slurm token4 | grep -E '^SLURM_JWT=' |
tail -n1)"
+if [[ -z "${TOKEN_LINE4}" ]]; then
+ echo "ERROR: 'make token4' did not produce a SLURM_JWT=... line" >&2
+ exit 1
+fi
+
+export TEST_SLURM_TOKEN4="${TOKEN_LINE4#SLURM_JWT=}"
+echo "==> TEST_SLURM_TOKEN4 set (${#TEST_SLURM_TOKEN4} chars)"
+
+export TEST_SLURM_API="http://localhost:6820"
+export TEST_SLURM_USER="root"
+export TEST_SLURM_API_VERSION="41"
+
+# go test -tags integration -v -count=1 \
+# ./connectors/SLURM/Rest-Client/pkg/client/...
+
+ go test -tags integration -v -count=1 \
+ ./connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor.go \
+
./connectors/SLURM/Usage-Monitor/internal/smonitor/smonitor_integration_test.go
+
+
+#go test -tags integration -v -count=1 \
+# ./connectors/SLURM/Rest-Client/pkg/client/accounts.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/associations.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/client.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/tres.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/types.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/integration_common.go \
+# ./connectors/SLURM/Rest-Client/pkg/client/associations_integration_test.go
+
+
+#make -s -C dev-ops/local-slurm down
\ No newline at end of file