This is an automated email from the ASF dual-hosted git repository. DImuthuUpe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit f0153ce418c72a0e017b869e9ccfce34a742cc1a Author: lahiruj <[email protected]> AuthorDate: Thu May 21 15:30:36 2026 -0400 Rename ExternalIdentity to UserIdentity and add source email column --- ...> 000017_user_identities_and_user_dns.down.sql} | 2 +- ... => 000017_user_identities_and_user_dns.up.sql} | 12 +- internal/server/server.go | 46 ++--- internal/store/external_identity_store.go | 132 -------------- internal/store/store.go | 31 ++-- internal/store/user_identity_store.go | 132 ++++++++++++++ pkg/events/external_identity_subscribe.go | 64 ------- pkg/events/types.go | 8 +- pkg/events/user_identity_subscribe.go | 63 +++++++ pkg/models/identity.go | 11 +- pkg/service/external_identity.go | 191 --------------------- pkg/service/service.go | 8 +- pkg/service/user.go | 15 +- pkg/service/user_identity.go | 190 ++++++++++++++++++++ pkg/service/user_merge.go | 6 +- 15 files changed, 456 insertions(+), 455 deletions(-) diff --git a/internal/db/migrations/000017_external_identities_and_user_dns.down.sql b/internal/db/migrations/000017_user_identities_and_user_dns.down.sql similarity index 95% rename from internal/db/migrations/000017_external_identities_and_user_dns.down.sql rename to internal/db/migrations/000017_user_identities_and_user_dns.down.sql index 88dae2ca8..72d75c517 100644 --- a/internal/db/migrations/000017_external_identities_and_user_dns.down.sql +++ b/internal/db/migrations/000017_user_identities_and_user_dns.down.sql @@ -16,4 +16,4 @@ -- under the License. DROP TABLE IF EXISTS user_dns; -DROP TABLE IF EXISTS external_identities; +DROP TABLE IF EXISTS user_identities; diff --git a/internal/db/migrations/000017_external_identities_and_user_dns.up.sql b/internal/db/migrations/000017_user_identities_and_user_dns.up.sql similarity index 82% rename from internal/db/migrations/000017_external_identities_and_user_dns.up.sql rename to internal/db/migrations/000017_user_identities_and_user_dns.up.sql index b093a2ff8..313f1de71 100644 --- a/internal/db/migrations/000017_external_identities_and_user_dns.up.sql +++ b/internal/db/migrations/000017_user_identities_and_user_dns.up.sql @@ -15,24 +15,26 @@ -- specific language governing permissions and limitations -- under the License. +-- email captures the address the source IdP configured for this identity. -- oidc_sub is nullable: not every external identity issues an OIDC subject -- (AMIE binds by external_id only). UNIQUE permits multiple NULLs but blocks -- collisions on real values across IdPs. -CREATE TABLE IF NOT EXISTS external_identities +CREATE TABLE IF NOT EXISTS user_identities ( id VARCHAR(255) NOT NULL, user_id VARCHAR(255) NOT NULL, source VARCHAR(64) NOT NULL, external_id VARCHAR(255) NOT NULL, + email VARCHAR(255) NULL DEFAULT NULL, oidc_sub VARCHAR(255) NULL DEFAULT NULL, metadata TEXT NULL DEFAULT NULL, created_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), updated_at TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), PRIMARY KEY (id), - UNIQUE KEY uq_external_identities_source_external (source, external_id), - UNIQUE KEY uq_external_identities_oidc_sub (oidc_sub), - KEY idx_external_identities_user (user_id), - CONSTRAINT fk_external_identities_user FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE + UNIQUE KEY uq_user_identities_source_external (source, external_id), + UNIQUE KEY uq_user_identities_oidc_sub (oidc_sub), + KEY idx_user_identities_user (user_id), + CONSTRAINT fk_user_identities_user FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_unicode_ci; -- A DN is a globally-unique credential. UNIQUE on dn alone subsumes diff --git a/internal/server/server.go b/internal/server/server.go index 05685d59a..30e05a9a3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -138,13 +138,13 @@ func (s *Server) routes() { s.mux.HandleFunc("GET /compute-allocations/{id}/users/{userId}/usages/total", s.getTotalSUUsageForUserInAllocation) s.mux.HandleFunc("GET /users/{id}/compute-allocation-usages", s.listUsagesByUser) - s.mux.HandleFunc("POST /external-identities", s.createExternalIdentity) - s.mux.HandleFunc("GET /external-identities/{id}", s.getExternalIdentity) - s.mux.HandleFunc("PUT /external-identities/{id}", s.updateExternalIdentity) - s.mux.HandleFunc("DELETE /external-identities/{id}", s.deleteExternalIdentity) - s.mux.HandleFunc("GET /external-identities/sources/{source}/external/{externalId}", s.getExternalIdentityBySourceAndExternalID) - s.mux.HandleFunc("GET /external-identities/oidc-subjects/{oidcSub}", s.getExternalIdentityByOIDCSub) - s.mux.HandleFunc("GET /users/{id}/external-identities", s.listExternalIdentitiesForUser) + s.mux.HandleFunc("POST /user-identities", s.createUserIdentity) + s.mux.HandleFunc("GET /user-identities/{id}", s.getUserIdentity) + s.mux.HandleFunc("PUT /user-identities/{id}", s.updateUserIdentity) + s.mux.HandleFunc("DELETE /user-identities/{id}", s.deleteUserIdentity) + s.mux.HandleFunc("GET /user-identities/sources/{source}/external/{externalId}", s.getUserIdentityBySourceAndExternalID) + s.mux.HandleFunc("GET /user-identities/oidc-subjects/{oidcSub}", s.getUserIdentityByOIDCSub) + s.mux.HandleFunc("GET /users/{id}/user-identities", s.listUserIdentitiesForUser) s.mux.HandleFunc("POST /user-dns", s.addUserDN) s.mux.HandleFunc("GET /user-dns/{id}", s.getUserDN) @@ -928,13 +928,13 @@ func (s *Server) updateComputeClusterUserStatus(w http.ResponseWriter, r *http.R writeJSON(w, http.StatusOK, cu) } -func (s *Server) createExternalIdentity(w http.ResponseWriter, r *http.Request) { - var e models.ExternalIdentity +func (s *Server) createUserIdentity(w http.ResponseWriter, r *http.Request) { + var e models.UserIdentity if err := decodeJSON(r, &e); err != nil { writeError(w, http.StatusBadRequest, err) return } - created, err := s.svc.CreateExternalIdentity(r.Context(), &e) + created, err := s.svc.CreateUserIdentity(r.Context(), &e) if err != nil { writeServiceError(w, err) return @@ -942,8 +942,8 @@ func (s *Server) createExternalIdentity(w http.ResponseWriter, r *http.Request) writeJSON(w, http.StatusCreated, created) } -func (s *Server) getExternalIdentity(w http.ResponseWriter, r *http.Request) { - e, err := s.svc.GetExternalIdentity(r.Context(), r.PathValue("id")) +func (s *Server) getUserIdentity(w http.ResponseWriter, r *http.Request) { + e, err := s.svc.GetUserIdentity(r.Context(), r.PathValue("id")) if err != nil { writeServiceError(w, err) return @@ -951,8 +951,8 @@ func (s *Server) getExternalIdentity(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, e) } -func (s *Server) getExternalIdentityBySourceAndExternalID(w http.ResponseWriter, r *http.Request) { - e, err := s.svc.GetExternalIdentityBySourceAndExternalID(r.Context(), r.PathValue("source"), r.PathValue("externalId")) +func (s *Server) getUserIdentityBySourceAndExternalID(w http.ResponseWriter, r *http.Request) { + e, err := s.svc.GetUserIdentityBySourceAndExternalID(r.Context(), r.PathValue("source"), r.PathValue("externalId")) if err != nil { writeServiceError(w, err) return @@ -960,8 +960,8 @@ func (s *Server) getExternalIdentityBySourceAndExternalID(w http.ResponseWriter, writeJSON(w, http.StatusOK, e) } -func (s *Server) getExternalIdentityByOIDCSub(w http.ResponseWriter, r *http.Request) { - e, err := s.svc.GetExternalIdentityByOIDCSub(r.Context(), r.PathValue("oidcSub")) +func (s *Server) getUserIdentityByOIDCSub(w http.ResponseWriter, r *http.Request) { + e, err := s.svc.GetUserIdentityByOIDCSub(r.Context(), r.PathValue("oidcSub")) if err != nil { writeServiceError(w, err) return @@ -969,8 +969,8 @@ func (s *Server) getExternalIdentityByOIDCSub(w http.ResponseWriter, r *http.Req writeJSON(w, http.StatusOK, e) } -func (s *Server) listExternalIdentitiesForUser(w http.ResponseWriter, r *http.Request) { - out, err := s.svc.ListExternalIdentitiesForUser(r.Context(), r.PathValue("id")) +func (s *Server) listUserIdentitiesForUser(w http.ResponseWriter, r *http.Request) { + out, err := s.svc.ListUserIdentitiesForUser(r.Context(), r.PathValue("id")) if err != nil { writeServiceError(w, err) return @@ -978,22 +978,22 @@ func (s *Server) listExternalIdentitiesForUser(w http.ResponseWriter, r *http.Re writeJSON(w, http.StatusOK, out) } -func (s *Server) updateExternalIdentity(w http.ResponseWriter, r *http.Request) { - var e models.ExternalIdentity +func (s *Server) updateUserIdentity(w http.ResponseWriter, r *http.Request) { + var e models.UserIdentity if err := decodeJSON(r, &e); err != nil { writeError(w, http.StatusBadRequest, err) return } e.ID = r.PathValue("id") - if err := s.svc.UpdateExternalIdentity(r.Context(), &e); err != nil { + if err := s.svc.UpdateUserIdentity(r.Context(), &e); err != nil { writeServiceError(w, err) return } writeJSON(w, http.StatusOK, &e) } -func (s *Server) deleteExternalIdentity(w http.ResponseWriter, r *http.Request) { - if err := s.svc.DeleteExternalIdentity(r.Context(), r.PathValue("id")); err != nil { +func (s *Server) deleteUserIdentity(w http.ResponseWriter, r *http.Request) { + if err := s.svc.DeleteUserIdentity(r.Context(), r.PathValue("id")); err != nil { writeServiceError(w, err) return } diff --git a/internal/store/external_identity_store.go b/internal/store/external_identity_store.go deleted file mode 100644 index 4728f2807..000000000 --- a/internal/store/external_identity_store.go +++ /dev/null @@ -1,132 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package store - -import ( - "context" - "database/sql" - "errors" - - "github.com/jmoiron/sqlx" - - "github.com/apache/airavata-custos/pkg/models" -) - -type mysqlExternalIdentityStore struct { - db *sqlx.DB -} - -// NewExternalIdentityStore returns a MySQL-backed ExternalIdentityStore. -func NewExternalIdentityStore(db *sqlx.DB) ExternalIdentityStore { - return &mysqlExternalIdentityStore{db: db} -} - -// oidc_sub and metadata are nullable; project NULL to ” for the model. -const externalIdentityColumns = `id, user_id, source, external_id, COALESCE(oidc_sub, '') AS oidc_sub, COALESCE(metadata, '') AS metadata, created_at` - -// nullableString returns nil when s is empty so the column stores SQL NULL -// rather than ”. NULL is the only value MySQL UNIQUE allows to repeat. -func nullableString(s string) any { - if s == "" { - return nil - } - return s -} - -func (s *mysqlExternalIdentityStore) FindByID(ctx context.Context, id string) (*models.ExternalIdentity, error) { - var e models.ExternalIdentity - err := s.db.GetContext(ctx, &e, - `SELECT `+externalIdentityColumns+` FROM external_identities WHERE id = ?`, id) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, err - } - return &e, nil -} - -func (s *mysqlExternalIdentityStore) FindBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.ExternalIdentity, error) { - var e models.ExternalIdentity - err := s.db.GetContext(ctx, &e, - `SELECT `+externalIdentityColumns+` FROM external_identities WHERE source = ? AND external_id = ?`, - source, externalID) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, err - } - return &e, nil -} - -func (s *mysqlExternalIdentityStore) FindByOIDCSub(ctx context.Context, oidcSub string) (*models.ExternalIdentity, error) { - if oidcSub == "" { - return nil, nil - } - var e models.ExternalIdentity - err := s.db.GetContext(ctx, &e, - `SELECT `+externalIdentityColumns+` FROM external_identities WHERE oidc_sub = ?`, oidcSub) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, nil - } - return nil, err - } - return &e, nil -} - -func (s *mysqlExternalIdentityStore) FindByUser(ctx context.Context, userID string) ([]models.ExternalIdentity, error) { - var out []models.ExternalIdentity - err := s.db.SelectContext(ctx, &out, - `SELECT `+externalIdentityColumns+` FROM external_identities WHERE user_id = ? ORDER BY created_at ASC`, - userID) - if err != nil { - return nil, err - } - return out, nil -} - -func (s *mysqlExternalIdentityStore) Create(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error { - _, err := tx.ExecContext(ctx, - `INSERT INTO external_identities (id, user_id, source, external_id, oidc_sub, metadata) - VALUES (?, ?, ?, ?, ?, ?)`, - e.ID, e.UserID, e.Source, e.ExternalID, nullableString(e.OIDCSub), nullableString(e.Metadata)) - return err -} - -func (s *mysqlExternalIdentityStore) Update(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error { - _, err := tx.ExecContext(ctx, - `UPDATE external_identities - SET user_id = ?, source = ?, external_id = ?, oidc_sub = ?, metadata = ? - WHERE id = ?`, - e.UserID, e.Source, e.ExternalID, nullableString(e.OIDCSub), nullableString(e.Metadata), e.ID) - return err -} - -func (s *mysqlExternalIdentityStore) ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error { - _, err := tx.ExecContext(ctx, - `UPDATE external_identities SET user_id = ? WHERE user_id = ?`, - toUserID, fromUserID) - return err -} - -func (s *mysqlExternalIdentityStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { - _, err := tx.ExecContext(ctx, `DELETE FROM external_identities WHERE id = ?`, id) - return err -} diff --git a/internal/store/store.go b/internal/store/store.go index 32824f7ff..797bb5526 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -108,24 +108,25 @@ type ComputeClusterUserStore interface { Delete(ctx context.Context, tx *sql.Tx, id string) error } -// ExternalIdentityStore defines persistence operations for external-identity -// bindings between Custos users and identifiers issued by external systems. -type ExternalIdentityStore interface { - // FindByID returns the external identity with the given ID, or nil if not found. - FindByID(ctx context.Context, id string) (*models.ExternalIdentity, error) +// UserIdentityStore defines persistence operations for the bindings between +// Custos users and identifiers issued by external systems (ACCESS, NAIRR, +// CILogon, etc.). +type UserIdentityStore interface { + // FindByID returns the user identity with the given ID, or nil if not found. + FindByID(ctx context.Context, id string) (*models.UserIdentity, error) // FindBySourceAndExternalID returns the binding for the given (source, external_id) pair, or nil if absent. - FindBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.ExternalIdentity, error) + FindBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.UserIdentity, error) // FindByOIDCSub returns the first binding matching the given OIDC subject, or nil if none. - FindByOIDCSub(ctx context.Context, oidcSub string) (*models.ExternalIdentity, error) - // FindByUser returns every external identity bound to the given user, ordered by created_at. - FindByUser(ctx context.Context, userID string) ([]models.ExternalIdentity, error) - // Create inserts a new external identity within the provided transaction. - Create(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error - // Update replaces mutable fields of an existing external identity within the provided transaction. - Update(ctx context.Context, tx *sql.Tx, e *models.ExternalIdentity) error - // ReassignUser moves every external identity owned by fromUserID over to toUserID. + FindByOIDCSub(ctx context.Context, oidcSub string) (*models.UserIdentity, error) + // FindByUser returns every user identity bound to the given user, ordered by created_at. + FindByUser(ctx context.Context, userID string) ([]models.UserIdentity, error) + // Create inserts a new user identity within the provided transaction. + Create(ctx context.Context, tx *sql.Tx, e *models.UserIdentity) error + // Update replaces mutable fields of an existing user identity within the provided transaction. + Update(ctx context.Context, tx *sql.Tx, e *models.UserIdentity) error + // ReassignUser moves every user identity owned by fromUserID over to toUserID. ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error - // Delete removes an external identity by ID within the provided transaction. + // Delete removes a user identity by ID within the provided transaction. Delete(ctx context.Context, tx *sql.Tx, id string) error } diff --git a/internal/store/user_identity_store.go b/internal/store/user_identity_store.go new file mode 100644 index 000000000..6dd74b08c --- /dev/null +++ b/internal/store/user_identity_store.go @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package store + +import ( + "context" + "database/sql" + "errors" + + "github.com/jmoiron/sqlx" + + "github.com/apache/airavata-custos/pkg/models" +) + +type mysqlUserIdentityStore struct { + db *sqlx.DB +} + +// NewUserIdentityStore returns a MySQL-backed UserIdentityStore. +func NewUserIdentityStore(db *sqlx.DB) UserIdentityStore { + return &mysqlUserIdentityStore{db: db} +} + +// email, oidc_sub and metadata are nullable; project NULL to "" for the model. +const userIdentityColumns = `id, user_id, source, external_id, COALESCE(email, '') AS email, COALESCE(oidc_sub, '') AS oidc_sub, COALESCE(metadata, '') AS metadata, created_at` + +// nullableString returns nil when s is empty so the column stores SQL NULL +// rather than "". NULL is the only value MySQL UNIQUE allows to repeat. +func nullableString(s string) any { + if s == "" { + return nil + } + return s +} + +func (s *mysqlUserIdentityStore) FindByID(ctx context.Context, id string) (*models.UserIdentity, error) { + var e models.UserIdentity + err := s.db.GetContext(ctx, &e, + `SELECT `+userIdentityColumns+` FROM user_identities WHERE id = ?`, id) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &e, nil +} + +func (s *mysqlUserIdentityStore) FindBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.UserIdentity, error) { + var e models.UserIdentity + err := s.db.GetContext(ctx, &e, + `SELECT `+userIdentityColumns+` FROM user_identities WHERE source = ? AND external_id = ?`, + source, externalID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &e, nil +} + +func (s *mysqlUserIdentityStore) FindByOIDCSub(ctx context.Context, oidcSub string) (*models.UserIdentity, error) { + if oidcSub == "" { + return nil, nil + } + var e models.UserIdentity + err := s.db.GetContext(ctx, &e, + `SELECT `+userIdentityColumns+` FROM user_identities WHERE oidc_sub = ?`, oidcSub) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, err + } + return &e, nil +} + +func (s *mysqlUserIdentityStore) FindByUser(ctx context.Context, userID string) ([]models.UserIdentity, error) { + var out []models.UserIdentity + err := s.db.SelectContext(ctx, &out, + `SELECT `+userIdentityColumns+` FROM user_identities WHERE user_id = ? ORDER BY created_at ASC`, + userID) + if err != nil { + return nil, err + } + return out, nil +} + +func (s *mysqlUserIdentityStore) Create(ctx context.Context, tx *sql.Tx, e *models.UserIdentity) error { + _, err := tx.ExecContext(ctx, + `INSERT INTO user_identities (id, user_id, source, external_id, email, oidc_sub, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + e.ID, e.UserID, e.Source, e.ExternalID, nullableString(e.Email), nullableString(e.OIDCSub), nullableString(e.Metadata)) + return err +} + +func (s *mysqlUserIdentityStore) Update(ctx context.Context, tx *sql.Tx, e *models.UserIdentity) error { + _, err := tx.ExecContext(ctx, + `UPDATE user_identities + SET user_id = ?, source = ?, external_id = ?, email = ?, oidc_sub = ?, metadata = ? + WHERE id = ?`, + e.UserID, e.Source, e.ExternalID, nullableString(e.Email), nullableString(e.OIDCSub), nullableString(e.Metadata), e.ID) + return err +} + +func (s *mysqlUserIdentityStore) ReassignUser(ctx context.Context, tx *sql.Tx, fromUserID, toUserID string) error { + _, err := tx.ExecContext(ctx, + `UPDATE user_identities SET user_id = ? WHERE user_id = ?`, + toUserID, fromUserID) + return err +} + +func (s *mysqlUserIdentityStore) Delete(ctx context.Context, tx *sql.Tx, id string) error { + _, err := tx.ExecContext(ctx, `DELETE FROM user_identities WHERE id = ?`, id) + return err +} diff --git a/pkg/events/external_identity_subscribe.go b/pkg/events/external_identity_subscribe.go deleted file mode 100644 index 05e2722ff..000000000 --- a/pkg/events/external_identity_subscribe.go +++ /dev/null @@ -1,64 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package events - -import ( - "log/slog" - - "github.com/apache/airavata-custos/pkg/models" -) - -// ExternalIdentityHandler handles external-identity lifecycle events with a -// typed payload. -type ExternalIdentityHandler func(identity models.ExternalIdentity) - -// SubscribeExternalIdentityCreated registers a typed handler invoked whenever -// an external_identity::create event is published. -func (b *Bus) SubscribeExternalIdentityCreated(handler ExternalIdentityHandler) { - b.subscribeExternalIdentity(ExternalIdentityCreateEvent, handler) -} - -// SubscribeExternalIdentityUpdated registers a typed handler invoked whenever -// an external_identity::update event is published. -func (b *Bus) SubscribeExternalIdentityUpdated(handler ExternalIdentityHandler) { - b.subscribeExternalIdentity(ExternalIdentityUpdateEvent, handler) -} - -// SubscribeExternalIdentityDeleted registers a typed handler invoked whenever -// an external_identity::delete event is published. -func (b *Bus) SubscribeExternalIdentityDeleted(handler ExternalIdentityHandler) { - b.subscribeExternalIdentity(ExternalIdentityDeleteEvent, handler) -} - -func (b *Bus) subscribeExternalIdentity(topic EventType, handler ExternalIdentityHandler) { - b.Subscribe(topic, func(event Event, value interface{}) { - switch e := value.(type) { - case models.ExternalIdentity: - handler(e) - case *models.ExternalIdentity: - if e != nil { - handler(*e) - } - default: - slog.Warn("external identity event payload has unexpected type", - "type", event.Type, - "got", value, - ) - } - }) -} diff --git a/pkg/events/types.go b/pkg/events/types.go index 4e191e27b..6cfc6300a 100644 --- a/pkg/events/types.go +++ b/pkg/events/types.go @@ -110,11 +110,11 @@ const ( ComputeAllocationResourceMappingDeleteEvent EventType = "compute_allocation_resource_mapping::delete" ) -// ExternalIdentity lifecycle message types. +// UserIdentity lifecycle message types. const ( - ExternalIdentityCreateEvent EventType = "external_identity::create" - ExternalIdentityUpdateEvent EventType = "external_identity::update" - ExternalIdentityDeleteEvent EventType = "external_identity::delete" + UserIdentityCreateEvent EventType = "user_identity::create" + UserIdentityUpdateEvent EventType = "user_identity::update" + UserIdentityDeleteEvent EventType = "user_identity::delete" ) // UserDN lifecycle message types. DN bindings are append-only credentials, so diff --git a/pkg/events/user_identity_subscribe.go b/pkg/events/user_identity_subscribe.go new file mode 100644 index 000000000..dfbf7df47 --- /dev/null +++ b/pkg/events/user_identity_subscribe.go @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +import ( + "log/slog" + + "github.com/apache/airavata-custos/pkg/models" +) + +// UserIdentityHandler handles user-identity lifecycle events with a typed payload. +type UserIdentityHandler func(identity models.UserIdentity) + +// SubscribeUserIdentityCreated registers a typed handler invoked whenever a +// user_identity::create event is published. +func (b *Bus) SubscribeUserIdentityCreated(handler UserIdentityHandler) { + b.subscribeUserIdentity(UserIdentityCreateEvent, handler) +} + +// SubscribeUserIdentityUpdated registers a typed handler invoked whenever a +// user_identity::update event is published. +func (b *Bus) SubscribeUserIdentityUpdated(handler UserIdentityHandler) { + b.subscribeUserIdentity(UserIdentityUpdateEvent, handler) +} + +// SubscribeUserIdentityDeleted registers a typed handler invoked whenever a +// user_identity::delete event is published. +func (b *Bus) SubscribeUserIdentityDeleted(handler UserIdentityHandler) { + b.subscribeUserIdentity(UserIdentityDeleteEvent, handler) +} + +func (b *Bus) subscribeUserIdentity(topic EventType, handler UserIdentityHandler) { + b.Subscribe(topic, func(event Event, value interface{}) { + switch e := value.(type) { + case models.UserIdentity: + handler(e) + case *models.UserIdentity: + if e != nil { + handler(*e) + } + default: + slog.Warn("user identity event payload has unexpected type", + "type", event.Type, + "got", value, + ) + } + }) +} diff --git a/pkg/models/identity.go b/pkg/models/identity.go index dc3cf5d95..83d24257b 100644 --- a/pkg/models/identity.go +++ b/pkg/models/identity.go @@ -19,15 +19,16 @@ package models import "time" -// ExternalIdentity links a User to its identifier in an external system -// (ACCESS, NAIRR, CILogon, etc.). One user may have many external identities. -// Source-specific attributes (e.g. NSF status code, ACCESS org code) belong -// in Metadata as a JSON-encoded blob. -type ExternalIdentity struct { +// UserIdentity links a User to its identifier in an external system (ACCESS, +// NAIRR, CILogon, etc.). One user may have many identities. Source-specific +// attributes (e.g. NSF status code, ACCESS org code) belong in Metadata as a +// JSON-encoded blob. +type UserIdentity struct { ID string `json:"id" db:"id"` UserID string `json:"user_id" db:"user_id"` Source string `json:"source" db:"source"` // e.g. access, nairr, cilogon ExternalID string `json:"external_id" db:"external_id"` // the source's native identifier + Email string `json:"email,omitempty" db:"email"` // the email configured by the source OIDCSub string `json:"oidc_sub,omitempty" db:"oidc_sub"` // OIDC subject when the source issues one Metadata string `json:"metadata,omitempty" db:"metadata"` // JSON-encoded source-specific fields CreatedAt time.Time `json:"created_at" db:"created_at"` diff --git a/pkg/service/external_identity.go b/pkg/service/external_identity.go deleted file mode 100644 index 1fe89af74..000000000 --- a/pkg/service/external_identity.go +++ /dev/null @@ -1,191 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package service - -import ( - "context" - "database/sql" - "fmt" - - "github.com/apache/airavata-custos/pkg/events" - "github.com/apache/airavata-custos/pkg/models" -) - -// CreateExternalIdentity persists a new external identity. If e.ID is empty, a -// new UUID is generated. The referenced user must already exist and the -// (source, external_id) pair is unique. -func (s *Service) CreateExternalIdentity(ctx context.Context, e *models.ExternalIdentity) (*models.ExternalIdentity, error) { - if e == nil { - return nil, fmt.Errorf("%w: external identity is nil", ErrInvalidInput) - } - if e.UserID == "" { - return nil, fmt.Errorf("%w: external identity user_id is required", ErrInvalidInput) - } - if e.Source == "" { - return nil, fmt.Errorf("%w: external identity source is required", ErrInvalidInput) - } - if e.ExternalID == "" { - return nil, fmt.Errorf("%w: external identity external_id is required", ErrInvalidInput) - } - - if user, err := s.users.FindByID(ctx, e.UserID); err != nil { - return nil, fmt.Errorf("verify user: %w", err) - } else if user == nil { - return nil, fmt.Errorf("%w: user %q does not exist", ErrInvalidInput, e.UserID) - } - - if existing, err := s.extIDs.FindBySourceAndExternalID(ctx, e.Source, e.ExternalID); err != nil { - return nil, fmt.Errorf("lookup external identity: %w", err) - } else if existing != nil { - return nil, fmt.Errorf("%w: external identity for source %q, external_id %q", ErrAlreadyExists, e.Source, e.ExternalID) - } - - if e.ID == "" { - e.ID = newID() - } - - if err := s.inTx(ctx, func(tx *sql.Tx) error { - return s.extIDs.Create(ctx, tx, e) - }); err != nil { - return nil, fmt.Errorf("create external identity: %w", err) - } - - s.eventBus.Publish(events.ExternalIdentityCreateEvent, e) - return e, nil -} - -// GetExternalIdentity retrieves an external identity by ID. Returns -// ErrNotFound when no row matches. -func (s *Service) GetExternalIdentity(ctx context.Context, id string) (*models.ExternalIdentity, error) { - e, err := s.extIDs.FindByID(ctx, id) - if err != nil { - return nil, fmt.Errorf("get external identity: %w", err) - } - if e == nil { - return nil, ErrNotFound - } - return e, nil -} - -// GetExternalIdentityBySourceAndExternalID retrieves the unique external -// identity for the given (source, external_id) pair. -func (s *Service) GetExternalIdentityBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.ExternalIdentity, error) { - if source == "" { - return nil, fmt.Errorf("%w: source is required", ErrInvalidInput) - } - if externalID == "" { - return nil, fmt.Errorf("%w: external_id is required", ErrInvalidInput) - } - e, err := s.extIDs.FindBySourceAndExternalID(ctx, source, externalID) - if err != nil { - return nil, fmt.Errorf("get external identity by source/external_id: %w", err) - } - if e == nil { - return nil, ErrNotFound - } - return e, nil -} - -// GetExternalIdentityByOIDCSub retrieves the first external identity matching -// the given OIDC subject. -func (s *Service) GetExternalIdentityByOIDCSub(ctx context.Context, oidcSub string) (*models.ExternalIdentity, error) { - if oidcSub == "" { - return nil, fmt.Errorf("%w: oidc_sub is required", ErrInvalidInput) - } - e, err := s.extIDs.FindByOIDCSub(ctx, oidcSub) - if err != nil { - return nil, fmt.Errorf("get external identity by oidc_sub: %w", err) - } - if e == nil { - return nil, ErrNotFound - } - return e, nil -} - -// ListExternalIdentitiesForUser returns every external identity belonging to -// the given user. -func (s *Service) ListExternalIdentitiesForUser(ctx context.Context, userID string) ([]models.ExternalIdentity, error) { - if userID == "" { - return nil, fmt.Errorf("%w: user_id is required", ErrInvalidInput) - } - out, err := s.extIDs.FindByUser(ctx, userID) - if err != nil { - return nil, fmt.Errorf("list external identities by user: %w", err) - } - return out, nil -} - -// UpdateExternalIdentity persists changes to an existing external identity. -// Fields left blank/zero on the supplied record fall back to the stored value. -func (s *Service) UpdateExternalIdentity(ctx context.Context, e *models.ExternalIdentity) error { - if e == nil || e.ID == "" { - return fmt.Errorf("%w: external identity id is required", ErrInvalidInput) - } - existing, err := s.extIDs.FindByID(ctx, e.ID) - if err != nil { - return fmt.Errorf("lookup external identity: %w", err) - } - if existing == nil { - return ErrNotFound - } - if e.UserID == "" { - e.UserID = existing.UserID - } - if e.Source == "" { - e.Source = existing.Source - } - if e.ExternalID == "" { - e.ExternalID = existing.ExternalID - } - if e.OIDCSub == "" { - e.OIDCSub = existing.OIDCSub - } - if e.Metadata == "" { - e.Metadata = existing.Metadata - } - if err := s.inTx(ctx, func(tx *sql.Tx) error { - return s.extIDs.Update(ctx, tx, e) - }); err != nil { - return fmt.Errorf("update external identity: %w", err) - } - - s.eventBus.Publish(events.ExternalIdentityUpdateEvent, e) - return nil -} - -// DeleteExternalIdentity removes an external identity by ID. -func (s *Service) DeleteExternalIdentity(ctx context.Context, id string) error { - if id == "" { - return fmt.Errorf("%w: external identity id is required", ErrInvalidInput) - } - e, err := s.extIDs.FindByID(ctx, id) - if err != nil { - return fmt.Errorf("lookup external identity: %w", err) - } - if e == nil { - return ErrNotFound - } - if err := s.inTx(ctx, func(tx *sql.Tx) error { - return s.extIDs.Delete(ctx, tx, id) - }); err != nil { - return fmt.Errorf("delete external identity: %w", err) - } - - s.eventBus.Publish(events.ExternalIdentityDeleteEvent, e) - return nil -} diff --git a/pkg/service/service.go b/pkg/service/service.go index 3ef5c92cd..6b0cdb555 100644 --- a/pkg/service/service.go +++ b/pkg/service/service.go @@ -51,7 +51,7 @@ type Service struct { memberships store.ComputeAllocationMembershipStore membershipOverrides store.ComputeAllocationMembershipResourceOverrideStore usages store.ComputeAllocationUsageStore - extIDs store.ExternalIdentityStore + userIdentities store.UserIdentityStore userDNs store.UserDNStore userMerges store.UserMergeStore } @@ -77,7 +77,7 @@ func New(database *sqlx.DB, eventBus *events.Bus) *Service { memberships: store.NewComputeAllocationMembershipStore(database), membershipOverrides: store.NewComputeAllocationMembershipResourceOverrideStore(database), usages: store.NewComputeAllocationUsageStore(database), - extIDs: store.NewExternalIdentityStore(database), + userIdentities: store.NewUserIdentityStore(database), userDNs: store.NewUserDNStore(database), userMerges: store.NewUserMergeStore(database), } @@ -104,7 +104,7 @@ func NewWithStores( membershipOverrides store.ComputeAllocationMembershipResourceOverrideStore, memberships store.ComputeAllocationMembershipStore, usages store.ComputeAllocationUsageStore, - extIDs store.ExternalIdentityStore, + userIdentities store.UserIdentityStore, userDNs store.UserDNStore, userMerges store.UserMergeStore, ) *Service { @@ -126,7 +126,7 @@ func NewWithStores( membershipOverrides: membershipOverrides, memberships: memberships, usages: usages, - extIDs: extIDs, + userIdentities: userIdentities, userDNs: userDNs, userMerges: userMerges, } diff --git a/pkg/service/user.go b/pkg/service/user.go index a17bf7a27..adc7f6a60 100644 --- a/pkg/service/user.go +++ b/pkg/service/user.go @@ -80,24 +80,23 @@ func (s *Service) GetUser(ctx context.Context, id string) (*models.User, error) return u, nil } -// GetUserByExternalIdentity returns the user owning the external identity -// uniquely identified by (source, externalID). Returns ErrNotFound when no -// such binding exists. -func (s *Service) GetUserByExternalIdentity(ctx context.Context, source, externalID string) (*models.User, error) { +// GetUserByUserIdentity returns the user owning the user identity uniquely +// identified by (source, externalID). Returns ErrNotFound when no such binding exists. +func (s *Service) GetUserByUserIdentity(ctx context.Context, source, externalID string) (*models.User, error) { if source == "" { return nil, fmt.Errorf("%w: source is required", ErrInvalidInput) } if externalID == "" { return nil, fmt.Errorf("%w: external_id is required", ErrInvalidInput) } - ext, err := s.extIDs.FindBySourceAndExternalID(ctx, source, externalID) + ident, err := s.userIdentities.FindBySourceAndExternalID(ctx, source, externalID) if err != nil { - return nil, fmt.Errorf("lookup external identity: %w", err) + return nil, fmt.Errorf("lookup user identity: %w", err) } - if ext == nil { + if ident == nil { return nil, ErrNotFound } - return s.GetUser(ctx, ext.UserID) + return s.GetUser(ctx, ident.UserID) } // GetUserByEmail retrieves a user by email. diff --git a/pkg/service/user_identity.go b/pkg/service/user_identity.go new file mode 100644 index 000000000..e9d9a7ce7 --- /dev/null +++ b/pkg/service/user_identity.go @@ -0,0 +1,190 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package service + +import ( + "context" + "database/sql" + "fmt" + + "github.com/apache/airavata-custos/pkg/events" + "github.com/apache/airavata-custos/pkg/models" +) + +// CreateUserIdentity persists a new user identity. If e.ID is empty, a new +// UUID is generated. The referenced user must already exist and the (source, external_id) pair is unique. +func (s *Service) CreateUserIdentity(ctx context.Context, e *models.UserIdentity) (*models.UserIdentity, error) { + if e == nil { + return nil, fmt.Errorf("%w: user identity is nil", ErrInvalidInput) + } + if e.UserID == "" { + return nil, fmt.Errorf("%w: user identity user_id is required", ErrInvalidInput) + } + if e.Source == "" { + return nil, fmt.Errorf("%w: user identity source is required", ErrInvalidInput) + } + if e.ExternalID == "" { + return nil, fmt.Errorf("%w: user identity external_id is required", ErrInvalidInput) + } + + if user, err := s.users.FindByID(ctx, e.UserID); err != nil { + return nil, fmt.Errorf("verify user: %w", err) + } else if user == nil { + return nil, fmt.Errorf("%w: user %q does not exist", ErrInvalidInput, e.UserID) + } + + if existing, err := s.userIdentities.FindBySourceAndExternalID(ctx, e.Source, e.ExternalID); err != nil { + return nil, fmt.Errorf("lookup user identity: %w", err) + } else if existing != nil { + return nil, fmt.Errorf("%w: user identity for source %q, external_id %q", ErrAlreadyExists, e.Source, e.ExternalID) + } + + if e.ID == "" { + e.ID = newID() + } + + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.userIdentities.Create(ctx, tx, e) + }); err != nil { + return nil, fmt.Errorf("create user identity: %w", err) + } + + s.eventBus.Publish(events.UserIdentityCreateEvent, e) + return e, nil +} + +// GetUserIdentity retrieves a user identity by ID. Returns ErrNotFound when no row matches. +func (s *Service) GetUserIdentity(ctx context.Context, id string) (*models.UserIdentity, error) { + e, err := s.userIdentities.FindByID(ctx, id) + if err != nil { + return nil, fmt.Errorf("get user identity: %w", err) + } + if e == nil { + return nil, ErrNotFound + } + return e, nil +} + +// GetUserIdentityBySourceAndExternalID retrieves the unique user identity for +// the given (source, external_id) pair. +func (s *Service) GetUserIdentityBySourceAndExternalID(ctx context.Context, source, externalID string) (*models.UserIdentity, error) { + if source == "" { + return nil, fmt.Errorf("%w: source is required", ErrInvalidInput) + } + if externalID == "" { + return nil, fmt.Errorf("%w: external_id is required", ErrInvalidInput) + } + e, err := s.userIdentities.FindBySourceAndExternalID(ctx, source, externalID) + if err != nil { + return nil, fmt.Errorf("get user identity by source/external_id: %w", err) + } + if e == nil { + return nil, ErrNotFound + } + return e, nil +} + +// GetUserIdentityByOIDCSub retrieves the first user identity matching the given OIDC subject. +func (s *Service) GetUserIdentityByOIDCSub(ctx context.Context, oidcSub string) (*models.UserIdentity, error) { + if oidcSub == "" { + return nil, fmt.Errorf("%w: oidc_sub is required", ErrInvalidInput) + } + e, err := s.userIdentities.FindByOIDCSub(ctx, oidcSub) + if err != nil { + return nil, fmt.Errorf("get user identity by oidc_sub: %w", err) + } + if e == nil { + return nil, ErrNotFound + } + return e, nil +} + +// ListUserIdentitiesForUser returns every user identity belonging to the given user. +func (s *Service) ListUserIdentitiesForUser(ctx context.Context, userID string) ([]models.UserIdentity, error) { + if userID == "" { + return nil, fmt.Errorf("%w: user_id is required", ErrInvalidInput) + } + out, err := s.userIdentities.FindByUser(ctx, userID) + if err != nil { + return nil, fmt.Errorf("list user identities by user: %w", err) + } + return out, nil +} + +// UpdateUserIdentity persists changes to an existing user identity. Fields +// left blank/zero on the supplied record fall back to the stored value. +func (s *Service) UpdateUserIdentity(ctx context.Context, e *models.UserIdentity) error { + if e == nil || e.ID == "" { + return fmt.Errorf("%w: user identity id is required", ErrInvalidInput) + } + existing, err := s.userIdentities.FindByID(ctx, e.ID) + if err != nil { + return fmt.Errorf("lookup user identity: %w", err) + } + if existing == nil { + return ErrNotFound + } + if e.UserID == "" { + e.UserID = existing.UserID + } + if e.Source == "" { + e.Source = existing.Source + } + if e.ExternalID == "" { + e.ExternalID = existing.ExternalID + } + if e.Email == "" { + e.Email = existing.Email + } + if e.OIDCSub == "" { + e.OIDCSub = existing.OIDCSub + } + if e.Metadata == "" { + e.Metadata = existing.Metadata + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.userIdentities.Update(ctx, tx, e) + }); err != nil { + return fmt.Errorf("update user identity: %w", err) + } + + s.eventBus.Publish(events.UserIdentityUpdateEvent, e) + return nil +} + +// DeleteUserIdentity removes a user identity by ID. +func (s *Service) DeleteUserIdentity(ctx context.Context, id string) error { + if id == "" { + return fmt.Errorf("%w: user identity id is required", ErrInvalidInput) + } + e, err := s.userIdentities.FindByID(ctx, id) + if err != nil { + return fmt.Errorf("lookup user identity: %w", err) + } + if e == nil { + return ErrNotFound + } + if err := s.inTx(ctx, func(tx *sql.Tx) error { + return s.userIdentities.Delete(ctx, tx, id) + }); err != nil { + return fmt.Errorf("delete user identity: %w", err) + } + + s.eventBus.Publish(events.UserIdentityDeleteEvent, e) + return nil +} diff --git a/pkg/service/user_merge.go b/pkg/service/user_merge.go index a2ffe2ad7..3d8860e02 100644 --- a/pkg/service/user_merge.go +++ b/pkg/service/user_merge.go @@ -33,7 +33,7 @@ import ( // happens in a single transaction. // // Moved to survivor (duplicates on the retiring user are dropped first): -// - external_identities +// - user_identities // - user_dns // - compute_cluster_users // - projects.project_pi_id @@ -85,8 +85,8 @@ func (s *Service) MergeUsers(ctx context.Context, survivingID, retiringID, reaso } if err := s.inTx(ctx, func(tx *sql.Tx) error { - if err := s.extIDs.ReassignUser(ctx, tx, retiringID, survivingID); err != nil { - return fmt.Errorf("reassign external identities: %w", err) + if err := s.userIdentities.ReassignUser(ctx, tx, retiringID, survivingID); err != nil { + return fmt.Errorf("reassign user identities: %w", err) } if err := s.userDNs.ReassignUser(ctx, tx, retiringID, survivingID); err != nil { return fmt.Errorf("reassign user dns: %w", err)
