This is an automated email from the ASF dual-hosted git repository.

DImuthuUpe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git


The following commit(s) were added to refs/heads/master by this push:
     new 034d0122b CILogon COmanage Provisioner Integration (#487)
034d0122b is described below

commit 034d0122ba21eb0a78413968907835e35f3a67c1
Author: Lahiru Jayathilake <[email protected]>
AuthorDate: Wed Jun 3 08:36:46 2026 -0400

    CILogon COmanage Provisioner Integration (#487)
    
    * Add posix username generation and per-cluster unique local_username
    
    * throw an error when the first and last names are empty when building the 
cluster username
    
    * POSIX username generation and updated AMIE account provisioning to use 
POSIX allocator
    
    * COmanage connector implementation
    
    * updated the baseline integration test for PI cluster provisioning and 
audit
---
 .../ACCESS/AMIE-Processor/config.yaml.example      |   3 -
 connectors/ACCESS/AMIE-Processor/config/config.go  |  16 +-
 .../ACCESS/AMIE-Processor/handler/handler.go       |  12 -
 .../ACCESS/AMIE-Processor/handler/posix_alloc.go   |  79 ++++++
 .../AMIE-Processor/handler/posix_alloc_test.go     | 100 ++++++++
 .../handler/request_account_create.go              |  10 +-
 .../handler/request_project_create.go              |  23 +-
 .../request_project_create_integration_test.go     |  16 ++
 .../pipeline/baseline_integration_test.go          |   5 +-
 .../testdata/scenarios/baseline.yaml               |  19 +-
 connectors/COmanage/Identity-Provisioner/README.md | 140 +++++++++++
 .../Identity-Provisioner/config.example.yaml       |  20 ++
 .../Identity-Provisioner/internal/client/client.go | 144 +++++++++++
 .../internal/client/client_test.go                 | 243 +++++++++++++++++++
 .../internal/client/group_members.go               | 121 ++++++++++
 .../Identity-Provisioner/internal/client/groups.go | 154 ++++++++++++
 .../internal/client/identifiers.go                 | 130 ++++++++++
 .../Identity-Provisioner/internal/client/people.go | 154 ++++++++++++
 .../internal/client/unix_cluster.go                |  93 ++++++++
 .../internal/operations/compose.go                 |  98 ++++++++
 .../internal/operations/compose_test.go            | 134 +++++++++++
 .../internal/operations/ensure_posix_account.go    | 265 +++++++++++++++++++++
 .../internal/operations/lookup.go                  |  84 +++++++
 .../internal/operations/orchestrator.go            |  41 ++++
 .../internal/subscribers/cluster_user.go           |  70 ++++++
 .../Identity-Provisioner/pkg/comanage/loader.go    | 108 +++++++++
 internal/connectors/loader.go                      |   7 +
 .../db/migrations/000002_compute_clusters.up.sql   |   1 +
 pkg/posix/username.go                              |  83 +++++++
 pkg/posix/username_test.go                         | 156 ++++++++++++
 pkg/service/compute_cluster_user.go                |  19 +-
 31 files changed, 2507 insertions(+), 41 deletions(-)

diff --git a/connectors/ACCESS/AMIE-Processor/config.yaml.example 
b/connectors/ACCESS/AMIE-Processor/config.yaml.example
index af0149ffd..528c9039f 100644
--- a/connectors/ACCESS/AMIE-Processor/config.yaml.example
+++ b/connectors/ACCESS/AMIE-Processor/config.yaml.example
@@ -19,6 +19,3 @@ amie:
 log:
   level: "info"
   format: "text"
-
-provisioner:
-  type: "noop"
diff --git a/connectors/ACCESS/AMIE-Processor/config/config.go 
b/connectors/ACCESS/AMIE-Processor/config/config.go
index 7daa093c7..a365aae1e 100644
--- a/connectors/ACCESS/AMIE-Processor/config/config.go
+++ b/connectors/ACCESS/AMIE-Processor/config/config.go
@@ -27,11 +27,10 @@ import (
 )
 
 type Config struct {
-       Server      ServerConfig      `yaml:"server"`
-       Database    DatabaseConfig    `yaml:"database"`
-       AMIE        AMIEConfig        `yaml:"amie"`
-       Log         LogConfig         `yaml:"log"`
-       Provisioner ProvisionerConfig `yaml:"provisioner"`
+       Server   ServerConfig   `yaml:"server"`
+       Database DatabaseConfig `yaml:"database"`
+       AMIE     AMIEConfig     `yaml:"amie"`
+       Log      LogConfig      `yaml:"log"`
 }
 
 type ServerConfig struct {
@@ -60,10 +59,6 @@ type LogConfig struct {
        Format string `yaml:"format"` // "text" or "json"
 }
 
-type ProvisionerConfig struct {
-       Type string `yaml:"type"` // "noop" or "slurm"
-}
-
 // Load reads config from a YAML file and applies environment variable 
overrides.
 func Load(path string) (*Config, error) {
        data, err := os.ReadFile(path)
@@ -113,9 +108,6 @@ func applyDefaults(cfg *Config) {
        if cfg.Log.Format == "" {
                cfg.Log.Format = "text"
        }
-       if cfg.Provisioner.Type == "" {
-               cfg.Provisioner.Type = "noop"
-       }
 }
 
 func applyEnvOverrides(cfg *Config) {
diff --git a/connectors/ACCESS/AMIE-Processor/handler/handler.go 
b/connectors/ACCESS/AMIE-Processor/handler/handler.go
index b4eca6db2..5153a4ac6 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/handler.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/handler.go
@@ -19,9 +19,7 @@ package handler
 
 import (
        "context"
-       "crypto/rand"
        "database/sql"
-       "encoding/hex"
        "errors"
        "fmt"
        "strconv"
@@ -193,16 +191,6 @@ func ensureOrganization(ctx context.Context, svc 
*service.Service, code, name st
        })
 }
 
-// generateTempPosixUsername returns a placeholder posix username for a
-// freshly provisioned ComputeClusterUser.
-//
-// TODO: replace with a real policy
-func generateTempPosixUsername() string {
-       var b [4]byte
-       _, _ = rand.Read(b[:])
-       return "amie-" + hex.EncodeToString(b[:])
-}
-
 // getInt64 reads a string-encoded integer from a packet body field. AMIE
 // transmits numeric fields like ServiceUnitsAllocated as JSON strings.
 func getInt64(body map[string]any, key string) (int64, error) {
diff --git a/connectors/ACCESS/AMIE-Processor/handler/posix_alloc.go 
b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc.go
new file mode 100644
index 000000000..b1fb3ef6b
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handler
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "strconv"
+
+       "github.com/apache/airavata-custos/pkg/models"
+       "github.com/apache/airavata-custos/pkg/posix"
+       "github.com/apache/airavata-custos/pkg/service"
+)
+
+func allocateAndCreateClusterUser(ctx context.Context, svc *service.Service, 
clusterID, userID string) (*models.ComputeClusterUser, error) {
+       user, err := svc.GetUser(ctx, userID)
+       if err != nil {
+               return nil, fmt.Errorf("lookup user %q: %w", userID, err)
+       }
+
+       base, truncated, err := posix.BuildBase(user, posix.Prefix())
+       if err != nil {
+               _, _ = svc.CreateAuditEvent(ctx, &models.AuditEvent{
+                       EventType: "PosixUsernameUnbuildable",
+                       EntityID:  userID,
+                       Details:   err.Error(),
+               })
+               return nil, err
+       }
+       if truncated {
+               _, _ = svc.CreateAuditEvent(ctx, &models.AuditEvent{
+                       EventType: "PosixUsernameTruncated",
+                       EntityID:  userID,
+                       Details:   base,
+               })
+       }
+
+       for n := 0; n < posix.MaxCollisionSuffix; n++ {
+               candidate := base
+               if n > 0 {
+                       candidate = base + strconv.Itoa(n+1)
+               }
+               ccu, err := svc.CreateComputeClusterUser(ctx, 
&models.ComputeClusterUser{
+                       ComputeClusterID: clusterID,
+                       UserID:           userID,
+                       LocalUsername:    candidate,
+               })
+               if err == nil {
+                       return ccu, nil
+               }
+               if errors.Is(err, service.ErrAlreadyExists) {
+                       continue
+               }
+               return nil, err
+       }
+
+       _, _ = svc.CreateAuditEvent(ctx, &models.AuditEvent{
+               EventType: "PosixUsernameAllocatorExhausted",
+               EntityID:  userID,
+               Details:   base,
+       })
+       return nil, fmt.Errorf("posix username allocator exhausted for base 
%q", base)
+}
diff --git a/connectors/ACCESS/AMIE-Processor/handler/posix_alloc_test.go 
b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc_test.go
new file mode 100644
index 000000000..b171c7cb6
--- /dev/null
+++ b/connectors/ACCESS/AMIE-Processor/handler/posix_alloc_test.go
@@ -0,0 +1,100 @@
+//go:build integration
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package handler
+
+import (
+       "context"
+       "fmt"
+       "strings"
+       "sync"
+       "testing"
+
+       "github.com/google/uuid"
+
+       "github.com/apache/airavata-custos/pkg/models"
+       "github.com/apache/airavata-custos/pkg/posix"
+)
+
+func TestAllocateAndCreateClusterUser_CollisionRetry(t *testing.T) {
+       database := setupTestDB(t)
+       svc := newTestCoreService(database)
+       ctx := context.Background()
+
+       org, err := svc.CreateOrganization(ctx, &models.Organization{
+               OriginatedID: uuid.NewString(),
+               Name:         "posix-alloc-test-org",
+       })
+       if err != nil {
+               t.Fatalf("create org: %v", err)
+       }
+
+       const n = 10
+       userIDs := make([]string, n)
+       for i := range userIDs {
+               u, err := svc.CreateUser(ctx, &models.User{
+                       OrganizationID: org.ID,
+                       FirstName:      "Collision",
+                       LastName:       "Target",
+                       Email:          fmt.Sprintf("col-%[email protected]", 
uuid.NewString()),
+               })
+               if err != nil {
+                       t.Fatalf("create user %d: %v", i, err)
+               }
+               userIDs[i] = u.ID
+       }
+
+       type result struct {
+               username string
+               err      error
+       }
+       results := make([]result, n)
+       var wg sync.WaitGroup
+       for i, uid := range userIDs {
+               wg.Add(1)
+               go func(idx int, userID string) {
+                       defer wg.Done()
+                       ccu, err := allocateAndCreateClusterUser(ctx, svc, 
testClusterID, userID)
+                       if err != nil {
+                               results[idx] = result{err: err}
+                               return
+                       }
+                       results[idx] = result{username: ccu.LocalUsername}
+               }(i, uid)
+       }
+       wg.Wait()
+
+       seen := make(map[string]bool)
+       for i, r := range results {
+               if r.err != nil {
+                       t.Errorf("goroutine %d: %v", i, r.err)
+                       continue
+               }
+               if seen[r.username] {
+                       t.Errorf("duplicate username %q across goroutines", 
r.username)
+               }
+               seen[r.username] = true
+               if !strings.HasPrefix(r.username, posix.Prefix()+"-") {
+                       t.Errorf("username %q does not start with %q", 
r.username, posix.Prefix()+"-")
+               }
+       }
+       if len(seen) != n {
+               t.Errorf("distinct usernames: got %d, want %d", len(seen), n)
+       }
+}
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
index a42a3ac02..9896916f5 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_account_create.go
@@ -160,8 +160,8 @@ func (h *RequestAccountCreateHandler) ensureUser(ctx 
context.Context, body map[s
        return user, nil
 }
 
-// ensureComputeClusterUser returns the user's existing cluster mapping on the
-// configured cluster, or provisions a fresh one with a temp posix username.
+// ensureComputeClusterUser returns the user's existing cluster mapping or
+// provisions a new one via the POSIX allocator.
 func (h *RequestAccountCreateHandler) ensureComputeClusterUser(ctx 
context.Context, userID string) (*models.ComputeClusterUser, error) {
        existing, err := h.svc.ListComputeClusterUsersByUser(ctx, userID)
        if err != nil {
@@ -172,11 +172,7 @@ func (h *RequestAccountCreateHandler) 
ensureComputeClusterUser(ctx context.Conte
                        return &a, nil
                }
        }
-       return h.svc.CreateComputeClusterUser(ctx, &models.ComputeClusterUser{
-               UserID:           userID,
-               ComputeClusterID: h.clusterID,
-               LocalUsername:    generateTempPosixUsername(),
-       })
+       return allocateAndCreateClusterUser(ctx, h.svc, h.clusterID, userID)
 }
 
 // ensureMembership returns the existing (allocation, user) membership or
diff --git a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
index 3d7dff92e..178f2be4e 100644
--- a/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
+++ b/connectors/ACCESS/AMIE-Processor/handler/request_project_create.go
@@ -78,6 +78,14 @@ func (h *RequestProjectCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
                return fmt.Errorf("request_project_create: audit CREATE_PERSON: 
%w", err)
        }
 
+       piClusterUser, err := h.ensurePIClusterUser(ctx, pi.ID)
+       if err != nil {
+               return fmt.Errorf("request_project_create: ensure PI cluster 
user: %w", err)
+       }
+       if err := h.auditSvc.Log(ctx, tx, packet.ID, eventID, 
model.AuditCreateAccount, "compute_cluster_user", piClusterUser.ID, 
piClusterUser.LocalUsername); err != nil {
+               return fmt.Errorf("request_project_create: audit CREATE_ACCOUNT 
(PI): %w", err)
+       }
+
        project, err := h.ensureProject(ctx, projectOriginatedID, grantNumber, 
pi.ID)
        if err != nil {
                return fmt.Errorf("request_project_create: ensure project: %w", 
err)
@@ -98,7 +106,7 @@ func (h *RequestProjectCreateHandler) Handle(ctx 
context.Context, tx *sql.Tx, pa
                "ProjectID":         project.ID,
                "GrantNumber":       grantNumber,
                "PiPersonID":        pi.ID,
-               "PiRemoteSiteLogin": piGlobalID,
+               "PiRemoteSiteLogin": piClusterUser.LocalUsername,
                "ResourceList":      getResourceList(body),
        }
        reply := map[string]any{"type": "notify_project_create", "body": 
replyBody}
@@ -142,6 +150,19 @@ func (h *RequestProjectCreateHandler) ensurePIUser(ctx 
context.Context, body map
        return user, nil
 }
 
+func (h *RequestProjectCreateHandler) ensurePIClusterUser(ctx context.Context, 
userID string) (*models.ComputeClusterUser, error) {
+       existing, err := h.svc.ListComputeClusterUsersByUser(ctx, userID)
+       if err != nil {
+               return nil, fmt.Errorf("list compute cluster users: %w", err)
+       }
+       for _, a := range existing {
+               if a.ComputeClusterID == h.clusterID {
+                       return &a, nil
+               }
+       }
+       return allocateAndCreateClusterUser(ctx, h.svc, h.clusterID, userID)
+}
+
 func (h *RequestProjectCreateHandler) ensureProject(ctx context.Context, 
originatedID, grantNumber, piID string) (*models.Project, error) {
        if p, err := h.svc.GetProjectByOriginatedID(ctx, originatedID); err == 
nil {
                return p, nil
diff --git 
a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
index 92b5e345c..08110d500 100644
--- 
a/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
+++ 
b/connectors/ACCESS/AMIE-Processor/handler/request_project_create_integration_test.go
@@ -187,6 +187,7 @@ func TestRequestProjectCreate_HappyPath(t *testing.T) {
 
        for _, action := range []model.AuditAction{
                model.AuditCreatePerson,
+               model.AuditCreateAccount,
                model.AuditCreateProject,
                model.AuditCreateAllocation,
                model.AuditReplySent,
@@ -196,6 +197,19 @@ func TestRequestProjectCreate_HappyPath(t *testing.T) {
                }
        }
 
+       var piCU struct {
+               LocalUsername string `db:"local_username"`
+       }
+       if err := database.Get(&piCU,
+               "SELECT local_username FROM compute_cluster_users WHERE user_id 
= ? AND compute_cluster_id = ?",
+               user.ID, testClusterID,
+       ); err != nil {
+               t.Fatalf("read PI compute_cluster_user: %v", err)
+       }
+       if piCU.LocalUsername == "" {
+               t.Errorf("PI compute_cluster_user.local_username: empty")
+       }
+
        if got, want := amie.lastReplyType(), "notify_project_create"; got != 
want {
                t.Fatalf("reply type: got %q, want %q", got, want)
        }
@@ -211,6 +225,8 @@ func TestRequestProjectCreate_HappyPath(t *testing.T) {
        }
        if got, _ := reply["PiRemoteSiteLogin"].(string); got == "" {
                t.Errorf("reply.PiRemoteSiteLogin: empty; required value")
+       } else if got != piCU.LocalUsername {
+               t.Errorf("reply.PiRemoteSiteLogin: got %q, want %q", got, 
piCU.LocalUsername)
        }
 }
 
diff --git 
a/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go 
b/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go
index 0ad15af13..a72678644 100644
--- a/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go
+++ b/connectors/ACCESS/AMIE-Processor/pipeline/baseline_integration_test.go
@@ -56,8 +56,8 @@ func TestPipeline_BaselineDeterminism(t *testing.T) {
                {"compute_allocations", 2},            // 
tables.compute_allocations.total_count
                {"compute_allocation_diffs", 1},       // 
tables.compute_allocation_diffs.total_count
                {"amie_user_dns", 2},                  // 
tables.amie_user_dns.total_count
-               {"amie_audit_log", 23},                // audit_log.total_count
-               {"compute_cluster_users", 0},          // not_expected
+               {"amie_audit_log", 26},                // audit_log.total_count
+               {"compute_cluster_users", 1},          // 
tables.compute_cluster_users.total_count (survivor's PI CCU; merge dedups Sam's)
                {"compute_allocation_memberships", 0}, // not_expected
        }
        if decoded != 9 {
@@ -75,6 +75,7 @@ func TestPipeline_BaselineDeterminism(t *testing.T) {
                want   int
        }{
                {"CREATE_PERSON", 3},
+               {"CREATE_ACCOUNT", 3},
                {"CREATE_PROJECT", 3},
                {"CREATE_ALLOCATION", 3},
                {"REPLY_SENT", 8},
diff --git a/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml 
b/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml
index 6768f4e66..878da6dd6 100644
--- a/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml
+++ b/connectors/ACCESS/AMIE-Processor/testdata/scenarios/baseline.yaml
@@ -10,8 +10,10 @@
 # NOT covered (handlers need the Custos-assigned project UUID, which the mock
 # can't supply): request_account_create, request_account_inactivate,
 # request_account_reactivate, request_project_inactivate, 
request_project_reactivate.
-# Tables left empty as a result: compute_cluster_users, 
compute_allocation_memberships.
+# Tables left empty as a result: compute_allocation_memberships.
 # Status-flip code paths on projects/allocations/memberships are also 
unexercised.
+# compute_cluster_users IS populated because request_project_create provisions
+# a PI cluster user via the POSIX allocator.
 
 scenario:
   name: baseline
@@ -121,10 +123,22 @@ expectations:
         - { dn: "/C=US/O=Baseline Org/CN=Sam Second",
             user_id: { from: "users.id where 
[email protected]" } }
 
+    compute_cluster_users:
+      # request_project_create now provisions a CCU for the PI via the POSIX
+      # allocator. BL-001 (#1 + supplement) creates Pat's CCU once. BL-002
+      # (#3) creates Sam's CCU. The merge then deletes Sam's CCU on Pat's
+      # cluster (cluster overlap dedup), leaving Pat's row only.
+      total_count: 1
+      rows:
+        - { compute_cluster_id: "${AMIE_CLUSTER_ID}",
+            local_username: custos-pfirst,
+            user_id: { from: "users.id where 
[email protected]" } }
+
   audit_log:
-    total_count: 23
+    total_count: 26
     by_action:
       CREATE_PERSON: 3        # all 3 request_project_create packets audit 
CREATE_PERSON (re-use audited too)
+      CREATE_ACCOUNT: 3       # PI CCU provisioning audited on every 
request_project_create (re-use audited too)
       CREATE_PROJECT: 3
       CREATE_ALLOCATION: 3    # supplement-delivery still audits 
CREATE_ALLOCATION (known M1: diff is the actual write)
       REPLY_SENT: 8           # inform_transaction_complete does not reply
@@ -145,7 +159,6 @@ expectations:
       - PERMANENTLY_FAILED
 
   not_expected:
-    - compute_cluster_users
     - compute_allocation_memberships
     - amie_processing_errors
 
diff --git a/connectors/COmanage/Identity-Provisioner/README.md 
b/connectors/COmanage/Identity-Provisioner/README.md
new file mode 100644
index 000000000..9df9cf4be
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/README.md
@@ -0,0 +1,140 @@
+# COmanage Identity-Provisioner
+
+Connector that bridges Custos to a COmanage Registry. When the core service
+emits `ComputeClusterUserCreateEvent` for a cluster this connector services,
+the orchestrator ensures the user has a fully provisioned POSIX identity in
+COmanage: a CoPerson, a per-user CoGroup, the matching Identifiers, and a
+`UnixClusterAccount` block on the target UnixCluster.
+
+## How it loads
+
+The loader is wired from `internal/connectors/loader.go` alongside SLURM and
+AMIE. On startup:
+
+1. `comanage.LoadConnector` reads seven required env vars:
+   `COMANAGE_REGISTRY_URL`, `COMANAGE_CO_ID`, `COMANAGE_API_USER`,
+   `COMANAGE_API_KEY`, `COMANAGE_PERSON_ID_TYPE`,
+   `COMANAGE_UNIX_CLUSTER_ID`, `CUSTOS_CLUSTER_ID`.
+2. If any are missing it logs `comanage provisioner: required env vars not 
set; skipping`
+   and returns `nil`. This is the documented way to disable the connector in a
+   deployment without code changes.
+
+`COMANAGE_PERSON_ID_TYPE` is the Identifier Type configured in the registry's
+Identifier Types screen that the connector uses to look up and tag a CoPerson.
+Set it to whatever string your COmanage instance uses for the per-person ID.
+3. If all six are present it constructs an HTTP client and a
+   `ClusterUserSubscriber`, then registers it against the shared event bus.
+   The subscriber filters by `CustosClusterID` so a deployment that services
+   multiple clusters can run multiple connector instances side by side.
+
+See `config.example.yaml` for the full env var reference (required + optional).
+
+## What the orchestrator does
+
+For each event, the orchestrator runs the following calls in order. Steps that
+find an existing record are short-circuited; steps that create something
+tolerate "already exists" responses so re-runs after a partial failure are
+idempotent.
+
+| # | Step | What it does | Endpoint |
+|---|------|--------------|----------|
+| 1 | Lookup CoPerson | Look up `comanage_id` stored in 
`user_identities(source="comanage")`, fall back to REST email search, otherwise 
POST a new CoPerson. | Core `GET /people/<comanage_id>`, REST `co_people.json`, 
Core `POST /people` |
+| 2 | Store `comanage_id` | Persist the COmanage identifier in 
`user_identities` so step 1 finds it next time. | Core service 
`CreateUserIdentity` |
+| 3 | Read composite | Pull the full Core composite to read `uidnumber` and 
`CoPerson.meta.id`. | Core `GET /people/<comanage_id>` |
+| 4 | Per-user CoGroup | Find or create a CoGroup named after the user's local 
username (`GroupType:"CL"`, `Auto:false`). | REST `co_groups.json` |
+| 5 | groupname Identifier | Attach a `type:"uid"` Identifier (the local 
username string) to the CoGroup. | REST `identifiers.json` |
+| 6 | gidnumber Identifier | Attach a `type:"gidnumber"` Identifier (numeric, 
mirrors `uidnumber`) to the CoGroup. | REST `identifiers.json` |
+| 7 | CoGroupMember | Join the CoPerson to the CoGroup as member + owner. | 
REST `co_group_members.json` |
+| 8 | UnixClusterGroup | Attach the CoGroup to the configured UnixCluster. A 
4xx here is treated as "already attached". | REST `unix_cluster_groups.json` |
+| 9 | UnixClusterAccount | Re-GET a fresh composite, merge a 
`UnixClusterAccount` block, then full-composite PUT. The merge round-trips 
unmodeled fields as `json.RawMessage` so `deleteOmitted` cannot drop attributes 
the connector does not understand. | Core `GET /people/<comanage_id>` then `PUT 
/people/<comanage_id>` |
+
+Step 9 uses `sync_mode:"M"` (Manual) so a downstream provisioning plugin
+cannot overwrite the block when Custos is the source of truth.
+
+## Audit events
+
+The connector emits the following `audit_events.event_type` values via the
+core service. `audit_events.entity_id` is always the 
`compute_cluster_users.id`.
+
+| Event type | When |
+|------------|------|
+| `ComanageCoPersonCreated` | A new CoPerson was POSTed in step 1. Lookups 
that resolved to an existing CoPerson do not fire this. |
+| `ComanageClusterAccountAttached` | The sequence completed and the user has a 
UnixClusterAccount block on the configured UnixCluster. |
+| `ComanageProvisioningFailed` | Any step returned an error. The `details` 
field carries `step=<name> err=<message>` so the dead-letter is tractable from 
the audit log alone. |
+
+The AMIE-Processor side of this flow also emits:
+
+| Event type | When |
+|------------|------|
+| `PosixUsernameTruncated` | The base username derived from the user's name 
was longer than the POSIX cap and got truncated. |
+| `PosixUsernameUnbuildable` | First and last name both normalized to empty so 
no candidate could be built. |
+| `PosixUsernameAllocatorExhausted` | All collision suffixes were exhausted. 
The cluster-user row is not created. |
+
+## End-to-end audit walkthrough
+
+A successful provisioning of a new user produces this sequence for a single
+`compute_cluster_users.id`:
+
+1. AMIE side: `PosixUsernameTruncated` may fire if the user's given and family
+   name exceeded the POSIX cap.
+2. AMIE side: the cluster-user row is committed and the event bus delivers
+   `ComputeClusterUserCreateEvent`.
+3. COmanage side: `ComanageCoPersonCreated` fires if step 1 went through the
+   POST path. Idempotent re-runs against an existing CoPerson skip this event.
+4. COmanage side: `ComanageClusterAccountAttached` fires after step 9, with
+   `details` of the form `comanage_id=<id> username=<local> uid=<n>`.
+
+A failure produces `ComanageProvisioningFailed` with
+`details=step=<n> err=<msg>` in place of the success event.
+
+## Username allocation
+
+Local POSIX usernames are allocated by `pkg/posix`:
+
+1. `BuildBase` derives a candidate from `Prefix() + first-initial + last-name`,
+   normalised to lowercase ASCII, capped at the POSIX length limit. Truncation
+   fires a `PosixUsernameTruncated` audit event. If both names normalize to
+   empty, the allocator emits `PosixUsernameUnbuildable` and returns an error.
+2. The AMIE handler attempts `CreateComputeClusterUser` with the base. If the
+   composite UNIQUE on `(compute_cluster_id, local_username)` rejects it, the
+   handler retries with `base + "2"`, `base + "3"`, and so on up to
+   `MaxCollisionSuffix`.
+3. If suffixes are exhausted a `PosixUsernameAllocatorExhausted` event is
+   emitted and the handler returns an error.
+
+## Local development
+
+The connector reads config from env vars. To run it against a test registry:
+
+```bash
+export COMANAGE_REGISTRY_URL=https://<your-registry>/registry
+export COMANAGE_CO_ID=<numeric CO id>
+export COMANAGE_API_USER=<api user for that CO>
+export COMANAGE_API_KEY=<api key for that user>
+export COMANAGE_PERSON_ID_TYPE=<CoPerson Identifier Type, per your registry>
+export COMANAGE_UNIX_CLUSTER_ID=<numeric UnixCluster id>
+export CUSTOS_CLUSTER_ID=<UUID from the compute_clusters table>
+go run ./cmd/server
+```
+
+Without those vars set the connector skips registration silently, so the rest
+of Custos runs unaffected.
+
+## Test surface
+
+```bash
+# unit
+go test ./connectors/COmanage/Identity-Provisioner/...
+
+# integration (requires a real registry; skipped by default)
+go test -tags integration ./connectors/COmanage/Identity-Provisioner/...
+```
+
+The unit tests cover:
+
+- HTTP client transport: 5xx retry with exponential backoff, error
+  classification (`ErrNotFound`, `ErrAuth401`, `HTTPError`).
+- Core API and REST wrappers via `httptest.NewServer`.
+- The `compose.go` merge layer: a fixture composite is merged with a
+  `UnixClusterAccount` block, then asserted to preserve every key from the
+  original.
diff --git a/connectors/COmanage/Identity-Provisioner/config.example.yaml 
b/connectors/COmanage/Identity-Provisioner/config.example.yaml
new file mode 100644
index 000000000..9ed3b990b
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/config.example.yaml
@@ -0,0 +1,20 @@
+# COmanage Identity-Provisioner connector
+#
+# This connector is configured via environment variables. Below is the full
+# list with example values; see README.md for semantics. Missing required
+# vars causes the loader to skip subscriber registration silently.
+
+# REQUIRED ---------------------------------------------------------------
+# COMANAGE_REGISTRY_URL=https://<your-registry>/registry
+# COMANAGE_CO_ID=<numeric CO id>
+# COMANAGE_API_USER=<api user for that CO>
+# COMANAGE_API_KEY=<api key for that user>
+# COMANAGE_PERSON_ID_TYPE=<CoPerson Identifier Type configured in the registry>
+# COMANAGE_UNIX_CLUSTER_ID=<numeric UnixCluster id>
+# CUSTOS_CLUSTER_ID=<UUID of the compute_cluster row this connector services>
+
+# OPTIONAL ---------------------------------------------------------------
+# COMANAGE_DEFAULT_SHELL=/bin/bash
+# COMANAGE_HOMEDIR_PREFIX=/home/
+# COMANAGE_HTTP_TIMEOUT=30s
+# POSIX_USERNAME_PREFIX=custos # read by pkg/posix; default "custos"
diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/client.go 
b/connectors/COmanage/Identity-Provisioner/internal/client/client.go
new file mode 100644
index 000000000..bf95a998c
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/client.go
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package client wraps the COmanage Core API and REST surfaces. Auth is HTTP
+// Basic.
+package client
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "io"
+       "net/http"
+       "strings"
+       "time"
+)
+
+// restAPIVersion is the COmanage REST v1 model version.
+const restAPIVersion = "1.0"
+
+type Config struct {
+       RegistryURL string
+       COID        int
+       APIUser     string
+       APIKey      string
+       // PersonIDType is the COmanage Identifier Type used to look up and tag 
a
+       // CoPerson (e.g. the type name configured in the registry's Identifier
+       // Types). Required value.
+       PersonIDType    string
+       UnixClusterID   int
+       CustosClusterID string
+       DefaultShell    string
+       HomedirPrefix   string
+       HTTPTimeout     time.Duration
+}
+
+type Client struct {
+       cfg  Config
+       http *http.Client
+}
+
+func New(cfg Config) *Client {
+       timeout := cfg.HTTPTimeout
+       if timeout == 0 {
+               timeout = 30 * time.Second
+       }
+       return &Client{
+               cfg:  cfg,
+               http: &http.Client{Timeout: timeout},
+       }
+}
+
+func (c *Client) Config() Config { return c.cfg }
+
+// coreAPI and restAPI are the two URL families. Centralized so an upstream
+// version bump flips one line.
+func (c *Client) coreAPI(path string) string {
+       return fmt.Sprintf("%s/api/co/%d/core/v1%s", c.cfg.RegistryURL, 
c.cfg.COID, path)
+}
+
+func (c *Client) restAPI(path string) string {
+       return c.cfg.RegistryURL + path
+}
+
+// Do issue an authenticated request and retries 5xx with backoff.
+func (c *Client) Do(method, url string, body []byte) (*http.Response, []byte, 
error) {
+       resp, respBody, err := c.doOnce(method, url, body, c.cfg.APIKey)
+       if err == nil && resp.StatusCode >= 500 && resp.StatusCode < 600 {
+               // 1s, 2s, 4s
+               for attempt := 1; attempt <= 3; attempt++ {
+                       _ = resp.Body.Close()
+                       time.Sleep(time.Duration(1<<uint(attempt-1)) * 
time.Second)
+                       resp, respBody, err = c.doOnce(method, url, body, 
c.cfg.APIKey)
+                       if err != nil || resp.StatusCode < 500 {
+                               break
+                       }
+               }
+       }
+       return resp, respBody, err
+}
+
+func (c *Client) doOnce(method, url string, body []byte, apiKey string) 
(*http.Response, []byte, error) {
+       var rdr io.Reader
+       if body != nil {
+               rdr = bytes.NewReader(body)
+       }
+       req, err := http.NewRequest(method, url, rdr)
+       if err != nil {
+               return nil, nil, fmt.Errorf("build request: %w", err)
+       }
+       if body != nil {
+               req.Header.Set("Content-Type", "application/json")
+       }
+       req.SetBasicAuth(c.cfg.APIUser, apiKey)
+
+       resp, err := c.http.Do(req)
+       if err != nil {
+               return nil, nil, fmt.Errorf("http: %w", err)
+       }
+       respBody, readErr := io.ReadAll(resp.Body)
+       _ = resp.Body.Close()
+       if readErr != nil {
+               return resp, nil, fmt.Errorf("read response: %w", readErr)
+       }
+       return resp, respBody, nil
+}
+
+var (
+       ErrAuth401  = errors.New("comanage: 401 unauthorized")
+       ErrNotFound = errors.New("comanage: not found")
+)
+
+type HTTPError struct {
+       Method     string
+       URL        string
+       StatusCode int
+       Body       string
+}
+
+func (e *HTTPError) Error() string {
+       return fmt.Sprintf("comanage %s %s: %d: %s", e.Method, e.URL, 
e.StatusCode, truncate(e.Body, 200))
+}
+
+func truncate(s string, n int) string {
+       s = strings.TrimSpace(s)
+       if len(s) <= n {
+               return s
+       }
+       return s[:n] + "..."
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go 
b/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go
new file mode 100644
index 000000000..471aa606b
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/client_test.go
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+       "encoding/json"
+       "io"
+       "net/http"
+       "net/http/httptest"
+       "strings"
+       "sync/atomic"
+       "testing"
+       "time"
+)
+
+func newTestClient(t *testing.T, baseURL string) *Client {
+       t.Helper()
+       return New(Config{
+               RegistryURL:   baseURL,
+               COID:          2,
+               APIUser:       "co_2.testuser",
+               APIKey:        "key-primary",
+               UnixClusterID: 1,
+               HTTPTimeout:   5 * time.Second,
+       })
+}
+
+func TestCreatePerson_SendsBasicAuthAndDecodes(t *testing.T) {
+       srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
r *http.Request) {
+               if r.Method != http.MethodPost || 
!strings.HasSuffix(r.URL.Path, "/api/co/2/core/v1/people") {
+                       t.Errorf("unexpected request: %s %s", r.Method, 
r.URL.Path)
+               }
+               user, pass, ok := r.BasicAuth()
+               if !ok || user != "co_2.testuser" || pass != "key-primary" {
+                       t.Errorf("auth header: ok=%v user=%q pass=%q", ok, 
user, pass)
+               }
+               w.WriteHeader(http.StatusCreated)
+               _, _ = io.WriteString(w, 
`[{"identifier":"Person100099","type":"comanage_id","login":false,"status":"A"}]`)
+       }))
+       defer srv.Close()
+
+       c := newTestClient(t, srv.URL+"/registry")
+       c.cfg.RegistryURL = srv.URL // override to match server root
+       out, err := c.CreatePerson([]byte(`{}`))
+       if err != nil {
+               t.Fatalf("CreatePerson: %v", err)
+       }
+       if len(out) != 1 || out[0].Identifier != "Person100099" || out[0].Type 
!= "comanage_id" {
+               t.Errorf("got %+v", out)
+       }
+}
+
+func TestDo_RetriesOn5xx(t *testing.T) {
+       var calls atomic.Int32
+       srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
r *http.Request) {
+               n := calls.Add(1)
+               if n < 3 {
+                       w.WriteHeader(http.StatusInternalServerError)
+                       return
+               }
+               w.WriteHeader(http.StatusOK)
+               _, _ = io.WriteString(w, "ok")
+       }))
+       defer srv.Close()
+
+       c := New(Config{
+               RegistryURL: srv.URL,
+               COID:        2,
+               APIUser:     "u",
+               APIKey:      "k",
+               HTTPTimeout: 2 * time.Second,
+       })
+       resp, body, err := c.Do("GET", srv.URL+"/anything", nil)
+       if err != nil {
+               t.Fatalf("Do: %v", err)
+       }
+       if resp.StatusCode != http.StatusOK {
+               t.Errorf("status: got %d", resp.StatusCode)
+       }
+       if got := string(body); got != "ok" {
+               t.Errorf("body: %q", got)
+       }
+       if calls.Load() != 3 {
+               t.Errorf("calls: got %d, want 3", calls.Load())
+       }
+}
+
+func TestCreateCoGroup_EnvelopeShape(t *testing.T) {
+       srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
r *http.Request) {
+               if !strings.HasSuffix(r.URL.Path, "/co_groups.json") {
+                       t.Errorf("unexpected path: %s", r.URL.Path)
+               }
+               body, _ := io.ReadAll(r.Body)
+               var req CoGroupCreateRequest
+               if err := json.Unmarshal(body, &req); err != nil {
+                       t.Fatalf("decode request: %v", err)
+               }
+               if req.RequestType != "CoGroups" || req.Version != "1.0" || 
len(req.CoGroups) != 1 {
+                       t.Errorf("envelope: %+v", req)
+               }
+               g := req.CoGroups[0]
+               if g.CoId != 2 || g.Name != "custos-test" || g.GroupType != 
"CL" || g.Auto {
+                       t.Errorf("group fields: %+v", g)
+               }
+               w.WriteHeader(http.StatusCreated)
+               _, _ = io.WriteString(w, 
`{"ResponseType":"NewObject","Version":"1.0","ObjectType":"CoGroup","Id":"42"}`)
+       }))
+       defer srv.Close()
+
+       c := newTestClient(t, srv.URL)
+       c.cfg.RegistryURL = srv.URL
+       id, err := c.CreateCoGroup("custos-test", "Primary group for 
custos-test")
+       if err != nil {
+               t.Fatalf("CreateCoGroup: %v", err)
+       }
+       if id != 42 {
+               t.Errorf("id: got %d, want 42", id)
+       }
+}
+
+func TestCreateIdentifierOnGroup_UsesNestedPersonShape(t *testing.T) {
+       srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
r *http.Request) {
+               body, _ := io.ReadAll(r.Body)
+               var req IdentifierCreateRequest
+               if err := json.Unmarshal(body, &req); err != nil {
+                       t.Fatalf("decode: %v", err)
+               }
+               ident := req.Identifiers[0]
+               if ident.Person.Type != "Group" || ident.Person.Id != 25 {
+                       t.Errorf("Person field: %+v (need {Type:Group,Id:25})", 
ident.Person)
+               }
+               w.WriteHeader(http.StatusCreated)
+               _, _ = io.WriteString(w, 
`{"ResponseType":"NewObject","Version":"1.0","ObjectType":"Identifier","Id":"7"}`)
+       }))
+       defer srv.Close()
+
+       c := newTestClient(t, srv.URL)
+       c.cfg.RegistryURL = srv.URL
+       id, err := c.CreateIdentifierOnGroup("custos-alice", "uid", 25)
+       if err != nil {
+               t.Fatalf("CreateIdentifierOnGroup: %v", err)
+       }
+       if id != 7 {
+               t.Errorf("id: %d", id)
+       }
+}
+
+func TestCreateUnixClusterGroup_URLAndBody(t *testing.T) {
+       srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
r *http.Request) {
+               if !strings.HasSuffix(r.URL.Path, 
"/unix_cluster/unix_cluster_groups.json") {
+                       t.Errorf("unexpected path: %s", r.URL.Path)
+               }
+               body, _ := io.ReadAll(r.Body)
+               var req UnixClusterGroupCreateRequest
+               if err := json.Unmarshal(body, &req); err != nil {
+                       t.Fatalf("decode: %v", err)
+               }
+               one := req.UnixClusterGroups[0]
+               if one.UnixClusterId != 1 || one.CoGroupId != 25 {
+                       t.Errorf("fields: %+v", one)
+               }
+               w.WriteHeader(http.StatusCreated)
+               _, _ = io.WriteString(w, 
`{"ResponseType":"NewObject","Version":"1.0","ObjectType":"UnixClusterGroup","Id":"3"}`)
+       }))
+       defer srv.Close()
+
+       c := newTestClient(t, srv.URL)
+       c.cfg.RegistryURL = srv.URL
+       id, err := c.CreateUnixClusterGroup(25)
+       if err != nil {
+               t.Fatalf("CreateUnixClusterGroup: %v", err)
+       }
+       if id != 3 {
+               t.Errorf("id: %d", id)
+       }
+}
+
+func TestGetPersonComposite_PreservesRawJSON(t *testing.T) {
+       srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
r *http.Request) {
+               if r.Method != http.MethodGet || !strings.Contains(r.URL.Path, 
"/api/co/2/core/v1/people/Person100099") {
+                       t.Errorf("unexpected request: %s %s", r.Method, 
r.URL.Path)
+               }
+               w.Header().Set("Content-Type", "application/json")
+               _, _ = io.WriteString(w, 
`{"CoPerson":{"meta":{"id":99}},"Name":[],"Identifier":[]}`)
+       }))
+       defer srv.Close()
+
+       c := newTestClient(t, srv.URL)
+       c.cfg.RegistryURL = srv.URL
+       raw, err := c.GetPersonComposite("Person100099")
+       if err != nil {
+               t.Fatalf("GetPersonComposite: %v", err)
+       }
+       if !strings.Contains(string(raw), `"id":99`) {
+               t.Errorf("raw missing expected substring: %s", string(raw))
+       }
+}
+
+func TestFindCoGroupByName_ExactMatchOnly(t *testing.T) {
+       srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, 
r *http.Request) {
+               _, _ = io.WriteString(w, `{
+            "ResponseType":"CoGroups","Version":"1.0",
+            "CoGroups":[
+                {"Version":"1.0","Id":11,"CoId":2,"Name":"custos-alice"},
+                {"Version":"1.0","Id":12,"CoId":2,"Name":"custos-bob"}
+            ]
+        }`)
+       }))
+       defer srv.Close()
+
+       c := newTestClient(t, srv.URL)
+       c.cfg.RegistryURL = srv.URL
+
+       id, err := c.FindCoGroupByName("custos-bob")
+       if err != nil {
+               t.Fatalf("FindCoGroupByName: %v", err)
+       }
+       if id != 12 {
+               t.Errorf("id: got %d, want 12", id)
+       }
+       id, err = c.FindCoGroupByName("custos-nope")
+       if err != nil {
+               t.Fatalf("FindCoGroupByName miss: %v", err)
+       }
+       if id != 0 {
+               t.Errorf("miss should return 0, got %d", id)
+       }
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go 
b/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go
new file mode 100644
index 000000000..922cc8984
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/group_members.go
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+)
+
+type CoGroupMemberCreateRequest struct {
+       RequestType    string                   `json:"RequestType"`
+       Version        string                   `json:"Version"`
+       CoGroupMembers []CoGroupMemberCreateOne `json:"CoGroupMembers"`
+}
+
+type CoGroupMemberCreateOne struct {
+       Version   string           `json:"Version"`
+       Person    IdentifierParent `json:"Person"`
+       CoGroupId int              `json:"CoGroupId"`
+       Member    bool             `json:"Member"`
+       Owner     bool             `json:"Owner"`
+}
+
+func (c *Client) CreateCoGroupMember(coPersonId, coGroupId int) (int, error) {
+       body, err := json.Marshal(CoGroupMemberCreateRequest{
+               RequestType: "CoGroupMembers",
+               Version:     restAPIVersion,
+               CoGroupMembers: []CoGroupMemberCreateOne{{
+                       Version:   restAPIVersion,
+                       Person:    IdentifierParent{Type: "CO", Id: coPersonId},
+                       CoGroupId: coGroupId,
+                       Member:    true,
+                       Owner:     true,
+               }},
+       })
+       if err != nil {
+               return 0, fmt.Errorf("marshal co_group_member: %w", err)
+       }
+
+       u := c.restAPI("/co_group_members.json")
+       resp, respBody, err := c.Do(http.MethodPost, u, body)
+       if err != nil {
+               return 0, err
+       }
+       switch resp.StatusCode {
+       case http.StatusCreated, http.StatusOK:
+               var out CoGroupCreateResponse
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return 0, fmt.Errorf("decode co_group_member response: 
%w: %s", err, string(respBody))
+               }
+               var id int
+               if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+                       return 0, fmt.Errorf("parse co_group_member id %q: %w", 
out.Id, err)
+               }
+               return id, nil
+       case http.StatusUnauthorized:
+               return 0, ErrAuth401
+       default:
+               return 0, &HTTPError{Method: "POST", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+type CoGroupMemberListResponse struct {
+       ResponseType   string                 `json:"ResponseType"`
+       Version        string                 `json:"Version"`
+       CoGroupMembers []CoGroupMemberListOne `json:"CoGroupMembers"`
+}
+
+type CoGroupMemberListOne struct {
+       Version   string           `json:"Version"`
+       Id        int              `json:"Id"`
+       Person    IdentifierParent `json:"Person"`
+       CoGroupId int              `json:"CoGroupId"`
+       Member    bool             `json:"Member"`
+       Owner     bool             `json:"Owner"`
+}
+
+// FindCoGroupMember returns the membership id for a (group, person) pair, or
+// 0 if none.
+func (c *Client) FindCoGroupMember(coGroupId, coPersonId int) (int, error) {
+       u := 
c.restAPI(fmt.Sprintf("/co_group_members.json?cogroupid=%d&copersonid=%d", 
coGroupId, coPersonId))
+       resp, respBody, err := c.Do(http.MethodGet, u, nil)
+       if err != nil {
+               return 0, err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK:
+               var out CoGroupMemberListResponse
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return 0, fmt.Errorf("decode co_group_members list: 
%w", err)
+               }
+               for _, m := range out.CoGroupMembers {
+                       if m.CoGroupId == coGroupId && m.Person.Id == 
coPersonId {
+                               return m.Id, nil
+                       }
+               }
+               return 0, nil
+       case http.StatusNoContent:
+               return 0, nil
+       case http.StatusUnauthorized:
+               return 0, ErrAuth401
+       default:
+               return 0, &HTTPError{Method: "GET", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/groups.go 
b/connectors/COmanage/Identity-Provisioner/internal/client/groups.go
new file mode 100644
index 000000000..d52e52ad7
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/groups.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+)
+
+// CoGroupCreateRequest is the body envelope for POST /co_groups.json.
+// GroupType "CL" = GroupEnum::Clusters (per-user cluster primary group).
+type CoGroupCreateRequest struct {
+       RequestType string             `json:"RequestType"`
+       Version     string             `json:"Version"`
+       CoGroups    []CoGroupCreateOne `json:"CoGroups"`
+}
+
+type CoGroupCreateOne struct {
+       Version     string `json:"Version"`
+       CoId        int    `json:"CoId"`
+       Name        string `json:"Name"`
+       Description string `json:"Description"`
+       Open        bool   `json:"Open"`
+       Status      string `json:"Status"`
+       GroupType   string `json:"GroupType"`
+       Auto        bool   `json:"Auto"`
+}
+
+// CoGroupCreateResponse is the standard NewObject shape returned by COmanage
+// REST POSTs.
+type CoGroupCreateResponse struct {
+       ResponseType string `json:"ResponseType"`
+       Version      string `json:"Version"`
+       ObjectType   string `json:"ObjectType"`
+       Id           string `json:"Id"`
+}
+
+func (c *Client) CreateCoGroup(name, description string) (int, error) {
+       body, err := json.Marshal(CoGroupCreateRequest{
+               RequestType: "CoGroups",
+               Version:     restAPIVersion,
+               CoGroups: []CoGroupCreateOne{{
+                       Version:     restAPIVersion,
+                       CoId:        c.cfg.COID,
+                       Name:        name,
+                       Description: description,
+                       Open:        false,
+                       Status:      "Active",
+                       GroupType:   "CL",
+                       Auto:        false,
+               }},
+       })
+       if err != nil {
+               return 0, fmt.Errorf("marshal co_group: %w", err)
+       }
+
+       u := c.restAPI("/co_groups.json")
+       resp, respBody, err := c.Do(http.MethodPost, u, body)
+       if err != nil {
+               return 0, err
+       }
+       switch resp.StatusCode {
+       case http.StatusCreated, http.StatusOK:
+               var out CoGroupCreateResponse
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return 0, fmt.Errorf("decode co_group create response: 
%w: %s", err, string(respBody))
+               }
+               var id int
+               if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+                       return 0, fmt.Errorf("parse co_group id %q: %w", 
out.Id, err)
+               }
+               return id, nil
+       case http.StatusUnauthorized:
+               return 0, ErrAuth401
+       default:
+               return 0, &HTTPError{Method: "POST", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+type CoGroupListResponse struct {
+       ResponseType string           `json:"ResponseType"`
+       Version      string           `json:"Version"`
+       CoGroups     []CoGroupReadOne `json:"CoGroups"`
+}
+
+type CoGroupReadOne struct {
+       Version string `json:"Version"`
+       Id      int    `json:"Id"`
+       CoId    int    `json:"CoId"`
+       Name    string `json:"Name"`
+}
+
+// FindCoGroupByName returns the group id, or 0 if no group with that name
+// exists in the configured CO.
+func (c *Client) FindCoGroupByName(name string) (int, error) {
+       u := c.restAPI(fmt.Sprintf("/co_groups.json?coid=%d", c.cfg.COID))
+       resp, respBody, err := c.Do(http.MethodGet, u, nil)
+       if err != nil {
+               return 0, err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK:
+               var out CoGroupListResponse
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return 0, fmt.Errorf("decode co_groups list: %w", err)
+               }
+               for _, g := range out.CoGroups {
+                       if g.Name == name {
+                               return g.Id, nil
+                       }
+               }
+               return 0, nil
+       case http.StatusNoContent:
+               return 0, nil
+       case http.StatusUnauthorized:
+               return 0, ErrAuth401
+       default:
+               return 0, &HTTPError{Method: "GET", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+func (c *Client) DeleteCoGroup(id int) error {
+       u := c.restAPI(fmt.Sprintf("/co_groups/%d.json", id))
+       resp, respBody, err := c.Do(http.MethodDelete, u, nil)
+       if err != nil {
+               return err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK, http.StatusNoContent:
+               return nil
+       case http.StatusUnauthorized:
+               return ErrAuth401
+       case http.StatusNotFound:
+               return ErrNotFound
+       default:
+               return &HTTPError{Method: "DELETE", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go 
b/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go
new file mode 100644
index 000000000..f562c2915
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/identifiers.go
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+)
+
+// IdentifierParent is the nested {"Type":"Group"|"CO","Id":<n>} block that
+// attaches an Identifier to a CoGroup or CoPerson.
+type IdentifierParent struct {
+       Type string `json:"Type"`
+       Id   int    `json:"Id"`
+}
+
+type IdentifierCreateRequest struct {
+       RequestType string                `json:"RequestType"`
+       Version     string                `json:"Version"`
+       Identifiers []IdentifierCreateOne `json:"Identifiers"`
+}
+
+type IdentifierCreateOne struct {
+       Version    string           `json:"Version"`
+       Identifier string           `json:"Identifier"`
+       Type       string           `json:"Type"`
+       Login      bool             `json:"Login"`
+       Status     string           `json:"Status"`
+       Person     IdentifierParent `json:"Person"`
+}
+
+// CreateIdentifierOnGroup POSTs an Identifier attached to a CoGroup.
+func (c *Client) CreateIdentifierOnGroup(value, identifierType string, 
coGroupId int) (int, error) {
+       body, err := json.Marshal(IdentifierCreateRequest{
+               RequestType: "Identifiers",
+               Version:     restAPIVersion,
+               Identifiers: []IdentifierCreateOne{{
+                       Version:    restAPIVersion,
+                       Identifier: value,
+                       Type:       identifierType,
+                       Login:      false,
+                       Status:     "Active",
+                       Person:     IdentifierParent{Type: "Group", Id: 
coGroupId},
+               }},
+       })
+       if err != nil {
+               return 0, fmt.Errorf("marshal identifier: %w", err)
+       }
+
+       u := c.restAPI("/identifiers.json")
+       resp, respBody, err := c.Do(http.MethodPost, u, body)
+       if err != nil {
+               return 0, err
+       }
+       switch resp.StatusCode {
+       case http.StatusCreated, http.StatusOK:
+               var out CoGroupCreateResponse // same NewObject shape
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return 0, fmt.Errorf("decode identifier create 
response: %w: %s", err, string(respBody))
+               }
+               var id int
+               if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+                       return 0, fmt.Errorf("parse identifier id %q: %w", 
out.Id, err)
+               }
+               return id, nil
+       case http.StatusUnauthorized:
+               return 0, ErrAuth401
+       default:
+               return 0, &HTTPError{Method: "POST", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+type IdentifierListResponse struct {
+       ResponseType string              `json:"ResponseType"`
+       Version      string              `json:"Version"`
+       Identifiers  []IdentifierListOne `json:"Identifiers"`
+}
+
+type IdentifierListOne struct {
+       Version    string `json:"Version"`
+       Id         int    `json:"Id"`
+       Identifier string `json:"Identifier"`
+       Type       string `json:"Type"`
+       Status     string `json:"Status"`
+}
+
+// FindIdentifierOnGroup returns the existing Identifier id with the given
+// type on a CoGroup, or 0 if none.
+func (c *Client) FindIdentifierOnGroup(coGroupId int, identifierType string) 
(int, error) {
+       u := c.restAPI(fmt.Sprintf("/identifiers.json?cogroupid=%d", coGroupId))
+       resp, respBody, err := c.Do(http.MethodGet, u, nil)
+       if err != nil {
+               return 0, err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK:
+               var out IdentifierListResponse
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return 0, fmt.Errorf("decode identifiers list: %w", err)
+               }
+               for _, ident := range out.Identifiers {
+                       if ident.Type == identifierType {
+                               return ident.Id, nil
+                       }
+               }
+               return 0, nil
+       case http.StatusNoContent:
+               return 0, nil
+       case http.StatusUnauthorized:
+               return 0, ErrAuth401
+       default:
+               return 0, &HTTPError{Method: "GET", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
diff --git a/connectors/COmanage/Identity-Provisioner/internal/client/people.go 
b/connectors/COmanage/Identity-Provisioner/internal/client/people.go
new file mode 100644
index 000000000..8968c1f8a
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/people.go
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "net/url"
+)
+
+// CreatePersonResponseEntry is one Identifier echoed by POST /people. COmanage
+// returns one row per identifier type configured on the binding, typically
+// including the auto-assigned ids.
+type CreatePersonResponseEntry struct {
+       Identifier string `json:"identifier"`
+       Type       string `json:"type"`
+       Login      bool   `json:"login"`
+       Status     string `json:"status"`
+}
+
+// CreatePerson POSTs the composite body (CoPerson + Name + Email +
+// Identifier blocks) to /people and returns the resulting Identifier list.
+func (c *Client) CreatePerson(body []byte) ([]CreatePersonResponseEntry, 
error) {
+       u := c.coreAPI("/people")
+       resp, respBody, err := c.Do(http.MethodPost, u, body)
+       if err != nil {
+               return nil, err
+       }
+       switch resp.StatusCode {
+       case http.StatusCreated, http.StatusOK:
+               var out []CreatePersonResponseEntry
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return nil, fmt.Errorf("decode create-person response: 
%w: %s", err, string(respBody))
+               }
+               return out, nil
+       case http.StatusUnauthorized:
+               return nil, ErrAuth401
+       default:
+               return nil, &HTTPError{Method: "POST", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+// GetPersonComposite returns the raw composite JSON, suitable for 
round-tripping
+// back into UpdatePerson.
+func (c *Client) GetPersonComposite(identifier string) (json.RawMessage, 
error) {
+       u := c.coreAPI("/people/" + identifier)
+       resp, respBody, err := c.Do(http.MethodGet, u, nil)
+       if err != nil {
+               return nil, err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK:
+               return json.RawMessage(respBody), nil
+       case http.StatusUnauthorized:
+               return nil, ErrAuth401
+       case http.StatusNotFound:
+               return nil, ErrNotFound
+       default:
+               return nil, &HTTPError{Method: "GET", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+// UpdatePerson PUTs a full-composite body. The body MUST include every related
+// model (Name, EmailAddress, Identifier, etc.) or COmanage's deleteOmitted
+// behavior will wipe them.
+func (c *Client) UpdatePerson(identifier string, body []byte) 
(json.RawMessage, error) {
+       u := c.coreAPI("/people/" + identifier)
+       resp, respBody, err := c.Do(http.MethodPut, u, body)
+       if err != nil {
+               return nil, err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK:
+               return json.RawMessage(respBody), nil
+       case http.StatusUnauthorized:
+               return nil, ErrAuth401
+       case http.StatusNotFound:
+               return nil, ErrNotFound
+       default:
+               return nil, &HTTPError{Method: "PUT", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+func (c *Client) DeletePerson(identifier string) error {
+       u := c.coreAPI("/people/" + identifier)
+       resp, respBody, err := c.Do(http.MethodDelete, u, nil)
+       if err != nil {
+               return err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK, http.StatusNoContent:
+               return nil
+       case http.StatusUnauthorized:
+               return ErrAuth401
+       case http.StatusNotFound:
+               return ErrNotFound
+       default:
+               return &HTTPError{Method: "DELETE", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+type CoPersonListResponse struct {
+       ResponseType string            `json:"ResponseType"`
+       Version      string            `json:"Version"`
+       CoPeople     []CoPersonListOne `json:"CoPeople"`
+}
+
+type CoPersonListOne struct {
+       Version string `json:"Version"`
+       Id      int    `json:"Id"`
+       CoId    int    `json:"CoId"`
+}
+
+// FindCoPersonByEmail searches for CoPeople. search.mail is a LIKE match in
+// COmanage; callers must post-filter for exact equality before trusting a hit.
+func (c *Client) FindCoPersonByEmail(email string) ([]CoPersonListOne, error) {
+       u := c.restAPI(fmt.Sprintf("/co_people.json?coid=%d&search.mail=%s", 
c.cfg.COID, url.QueryEscape(email)))
+       resp, respBody, err := c.Do(http.MethodGet, u, nil)
+       if err != nil {
+               return nil, err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK:
+               var out CoPersonListResponse
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return nil, fmt.Errorf("decode co_people list: %w", err)
+               }
+               return out.CoPeople, nil
+       case http.StatusNoContent:
+               return nil, nil
+       case http.StatusUnauthorized:
+               return nil, ErrAuth401
+       case http.StatusNotFound:
+               return nil, ErrNotFound
+       default:
+               return nil, &HTTPError{Method: "GET", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go 
b/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go
new file mode 100644
index 000000000..2a9eaa99d
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/client/unix_cluster.go
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package client
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+)
+
+type UnixClusterGroupCreateRequest struct {
+       RequestType       string                      `json:"RequestType"`
+       Version           string                      `json:"Version"`
+       UnixClusterGroups []UnixClusterGroupCreateOne `json:"UnixClusterGroups"`
+}
+
+type UnixClusterGroupCreateOne struct {
+       Version       string `json:"Version"`
+       UnixClusterId int    `json:"UnixClusterId"`
+       CoGroupId     int    `json:"CoGroupId"`
+}
+
+// CreateUnixClusterGroup binds a CoGroup to a UnixCluster. The URL takes no
+// named params for POST.
+func (c *Client) CreateUnixClusterGroup(coGroupId int) (int, error) {
+       body, err := json.Marshal(UnixClusterGroupCreateRequest{
+               RequestType: "UnixClusterGroups",
+               Version:     restAPIVersion,
+               UnixClusterGroups: []UnixClusterGroupCreateOne{{
+                       Version:       restAPIVersion,
+                       UnixClusterId: c.cfg.UnixClusterID,
+                       CoGroupId:     coGroupId,
+               }},
+       })
+       if err != nil {
+               return 0, fmt.Errorf("marshal unix_cluster_group: %w", err)
+       }
+
+       u := c.restAPI("/unix_cluster/unix_cluster_groups.json")
+       resp, respBody, err := c.Do(http.MethodPost, u, body)
+       if err != nil {
+               return 0, err
+       }
+       switch resp.StatusCode {
+       case http.StatusCreated, http.StatusOK:
+               var out CoGroupCreateResponse // NewObject shape
+               if err := json.Unmarshal(respBody, &out); err != nil {
+                       return 0, fmt.Errorf("decode unix_cluster_group 
response: %w: %s", err, string(respBody))
+               }
+               var id int
+               if _, err := fmt.Sscanf(out.Id, "%d", &id); err != nil {
+                       return 0, fmt.Errorf("parse unix_cluster_group id %q: 
%w", out.Id, err)
+               }
+               return id, nil
+       case http.StatusUnauthorized:
+               return 0, ErrAuth401
+       default:
+               return 0, &HTTPError{Method: "POST", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
+
+func (c *Client) DeleteUnixClusterGroup(id int) error {
+       u := c.restAPI(fmt.Sprintf("/unix_cluster/unix_cluster_groups/%d.json", 
id))
+       resp, respBody, err := c.Do(http.MethodDelete, u, nil)
+       if err != nil {
+               return err
+       }
+       switch resp.StatusCode {
+       case http.StatusOK, http.StatusNoContent:
+               return nil
+       case http.StatusUnauthorized:
+               return ErrAuth401
+       case http.StatusNotFound:
+               return ErrNotFound
+       default:
+               return &HTTPError{Method: "DELETE", URL: u, StatusCode: 
resp.StatusCode, Body: string(respBody)}
+       }
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go 
b/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go
new file mode 100644
index 000000000..e080acbf5
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/operations/compose.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+       "encoding/json"
+       "fmt"
+)
+
+// UnixClusterAccountBlock is appended to the composite PUT body. Field names
+// mirror COmanage's JSON attributes.
+type UnixClusterAccountBlock struct {
+       UnixClusterId    int    `json:"unix_cluster_id"`
+       SyncMode         string `json:"sync_mode"`
+       Status           string `json:"status"`
+       Username         string `json:"username"`
+       Uid              int64  `json:"uid"`
+       Gecos            string `json:"gecos"`
+       LoginShell       string `json:"login_shell"`
+       HomeDirectory    string `json:"home_directory"`
+       PrimaryCoGroupId int    `json:"primary_co_group_id"`
+}
+
+// mergeUnixClusterAccount sets "UnixClusterAccount" on the composite while
+// preserving every other top-level key.
+func mergeUnixClusterAccount(composite json.RawMessage, block 
UnixClusterAccountBlock) ([]byte, error) {
+       var top map[string]json.RawMessage
+       if err := json.Unmarshal(composite, &top); err != nil {
+               return nil, fmt.Errorf("decode composite: %w", err)
+       }
+       blockJSON, err := json.Marshal([]UnixClusterAccountBlock{block})
+       if err != nil {
+               return nil, fmt.Errorf("encode UnixClusterAccount block: %w", 
err)
+       }
+       top["UnixClusterAccount"] = blockJSON
+       out, err := json.Marshal(top)
+       if err != nil {
+               return nil, fmt.Errorf("encode merged composite: %w", err)
+       }
+       return out, nil
+}
+
+// extractIdentifier returns the first Identifier.identifier whose type 
matches,
+// or "" if none.
+func extractIdentifier(composite json.RawMessage, identifierType string) 
(string, error) {
+       var top map[string]json.RawMessage
+       if err := json.Unmarshal(composite, &top); err != nil {
+               return "", fmt.Errorf("decode composite: %w", err)
+       }
+       rawIdents, ok := top["Identifier"]
+       if !ok {
+               return "", nil
+       }
+       var idents []struct {
+               Identifier string `json:"identifier"`
+               Type       string `json:"type"`
+       }
+       if err := json.Unmarshal(rawIdents, &idents); err != nil {
+               return "", fmt.Errorf("decode Identifier array: %w", err)
+       }
+       for _, id := range idents {
+               if id.Type == identifierType {
+                       return id.Identifier, nil
+               }
+       }
+       return "", nil
+}
+
+// extractCoPersonID returns the numeric CoPerson.meta.id (distinct from the
+// PersonIDType identifier string), or 0 if missing.
+func extractCoPersonID(composite json.RawMessage) (int, error) {
+       var top struct {
+               CoPerson struct {
+                       Meta struct {
+                               Id int `json:"id"`
+                       } `json:"meta"`
+               } `json:"CoPerson"`
+       }
+       if err := json.Unmarshal(composite, &top); err != nil {
+               return 0, fmt.Errorf("decode composite for CoPerson.id: %w", 
err)
+       }
+       return top.CoPerson.Meta.Id, nil
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go 
b/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go
new file mode 100644
index 000000000..c7d0b12d4
--- /dev/null
+++ 
b/connectors/COmanage/Identity-Provisioner/internal/operations/compose_test.go
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+       "encoding/json"
+       "testing"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+// sampleComposite mirrors a real CoPerson composite trimmed to the fields the
+// operations layer reads.
+const sampleComposite = `{
+    "CoPerson":{"meta":{"id":98},"co_id":2,"status":"A"},
+    
"Name":[{"given":"GoalE2E","family":"Throwaway","type":"official","primary_name":true}],
+    
"EmailAddress":[{"mail":"[email protected]","type":"official","verified":false}],
+    "CoGroupMember":[
+        {"co_group_id":7,"member":true,"owner":false},
+        {"co_group_id":25,"member":true,"owner":true}
+    ],
+    "Identifier":[
+        
{"identifier":"http://test.invalid/sub","type":"oidcsub","login":true,"status":"A"},
+        
{"identifier":"Person100016","type":"comanage_id","login":false,"status":"A"},
+        
{"identifier":"100016","type":"comanage_number","login":false,"status":"A"},
+        {"identifier":"vspectes2","type":"uid","login":false,"status":"A"},
+        {"identifier":"2000016","type":"uidnumber","login":false,"status":"A"},
+        {"identifier":"2000016","type":"gidnumber","login":false,"status":"A"}
+    ]
+}`
+
+func TestExtractIdentifier(t *testing.T) {
+       tests := []struct {
+               typeName string
+               want     string
+       }{
+               {"comanage_id", "Person100016"},
+               {"uidnumber", "2000016"},
+               {"gidnumber", "2000016"},
+               {"oidcsub", "http://test.invalid/sub"},
+               {"missing-type", ""},
+       }
+       for _, tc := range tests {
+               t.Run(tc.typeName, func(t *testing.T) {
+                       got, err := extractIdentifier([]byte(sampleComposite), 
tc.typeName)
+                       if err != nil {
+                               t.Fatalf("extractIdentifier: %v", err)
+                       }
+                       if got != tc.want {
+                               t.Errorf("got %q, want %q", got, tc.want)
+                       }
+               })
+       }
+}
+
+func TestExtractCoPersonID(t *testing.T) {
+       got, err := extractCoPersonID([]byte(sampleComposite))
+       if err != nil {
+               t.Fatalf("extractCoPersonID: %v", err)
+       }
+       if got != 98 {
+               t.Errorf("got %d, want 98", got)
+       }
+}
+
+func TestMergeUnixClusterAccount_PreservesAllKeysAndAppendsBlock(t *testing.T) 
{
+       block := UnixClusterAccountBlock{
+               UnixClusterId:    1,
+               SyncMode:         "M",
+               Status:           "A",
+               Username:         "custos-gthrowa",
+               Uid:              2000016,
+               Gecos:            "",
+               LoginShell:       "/bin/bash",
+               HomeDirectory:    "/home/custos-gthrowa",
+               PrimaryCoGroupId: 25,
+       }
+       out, err := mergeUnixClusterAccount([]byte(sampleComposite), block)
+       if err != nil {
+               t.Fatalf("mergeUnixClusterAccount: %v", err)
+       }
+
+       var merged map[string]json.RawMessage
+       if err := json.Unmarshal(out, &merged); err != nil {
+               t.Fatalf("decode merged: %v", err)
+       }
+
+       for _, key := range []string{"CoPerson", "Name", "EmailAddress", 
"Identifier", "CoGroupMember", "UnixClusterAccount"} {
+               if _, ok := merged[key]; !ok {
+                       t.Errorf("merged composite missing key %q", key)
+               }
+       }
+
+       // UnixClusterAccount must be an array with one block matching what we 
sent.
+       var unix []UnixClusterAccountBlock
+       if err := json.Unmarshal(merged["UnixClusterAccount"], &unix); err != 
nil {
+               t.Fatalf("decode UnixClusterAccount: %v", err)
+       }
+       if len(unix) != 1 || unix[0].Username != "custos-gthrowa" || 
unix[0].Uid != 2000016 || unix[0].PrimaryCoGroupId != 25 {
+               t.Errorf("unix block: %+v", unix)
+       }
+}
+
+func TestBuildCreatePersonBody_Shape(t *testing.T) {
+       u := &models.User{FirstName: "GoalE2E", LastName: "Throwaway", Email: 
"[email protected]"}
+       raw, err := buildCreatePersonBody(2, u)
+       if err != nil {
+               t.Fatalf("buildCreatePersonBody: %v", err)
+       }
+       var got map[string]json.RawMessage
+       if err := json.Unmarshal(raw, &got); err != nil {
+               t.Fatalf("decode: %v", err)
+       }
+       for _, key := range []string{"CoPerson", "Name", "EmailAddress"} {
+               if _, ok := got[key]; !ok {
+                       t.Errorf("body missing %q", key)
+               }
+       }
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go
 
b/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go
new file mode 100644
index 000000000..7c2d8c964
--- /dev/null
+++ 
b/connectors/COmanage/Identity-Provisioner/internal/operations/ensure_posix_account.go
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "fmt"
+       "log/slog"
+       "strconv"
+
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+func (o *Orchestrator) ensurePOSIXAccountImpl(ctx context.Context, cu 
*models.ComputeClusterUser) error {
+       log := slog.With("correlation_id", cu.ID, "custos_user_id", cu.UserID, 
"unix_cluster_id", o.c.Config().UnixClusterID)
+
+       user, err := o.core.GetUser(ctx, cu.UserID)
+       if err != nil {
+               return fmt.Errorf("get custos user: %w", err)
+       }
+
+       personID, composite, created, err := o.lookupOrCreateCoPerson(ctx, user)
+       if err != nil {
+               o.dlq(ctx, cu, "lookup_or_create_coperson", err)
+               return err
+       }
+       log = log.With("comanage_person_id", personID)
+       log.Info("comanage: CoPerson resolved", "created", created)
+       if created {
+               o.audit(ctx, cu, "ComanageCoPersonCreated", 
fmt.Sprintf("comanage_id=%s email=%s", personID, user.Email))
+       }
+
+       if err := o.storePersonID(ctx, cu.UserID, personID); err != nil {
+               log.Warn("comanage: failed to store CoPerson id", "err", err)
+       }
+
+       if composite == nil {
+               composite, err = o.c.GetPersonComposite(personID)
+               if err != nil {
+                       o.dlq(ctx, cu, "get_composite", err)
+                       return err
+               }
+       }
+
+       uidnumber, err := extractIdentifier(composite, "uidnumber")
+       if err != nil {
+               o.dlq(ctx, cu, "extract_uidnumber", err)
+               return err
+       }
+       if uidnumber == "" {
+               err := fmt.Errorf("uidnumber identifier missing on CoPerson 
%s", personID)
+               o.dlq(ctx, cu, "missing_uidnumber", err)
+               return err
+       }
+       uidInt, err := strconv.ParseInt(uidnumber, 10, 64)
+       if err != nil {
+               o.dlq(ctx, cu, "parse_uidnumber", err)
+               return err
+       }
+
+       coPersonID, err := extractCoPersonID(composite)
+       if err != nil || coPersonID == 0 {
+               o.dlq(ctx, cu, "extract_coperson_id", err)
+               return fmt.Errorf("extract CoPerson.meta.id: %w", err)
+       }
+
+       coGroupID, err := o.c.FindCoGroupByName(cu.LocalUsername)
+       if err != nil {
+               o.dlq(ctx, cu, "find_co_group", err)
+               return err
+       }
+       if coGroupID == 0 {
+               coGroupID, err = o.c.CreateCoGroup(cu.LocalUsername, "Primary 
group for "+cu.LocalUsername)
+               if err != nil {
+                       o.dlq(ctx, cu, "create_co_group", err)
+                       return err
+               }
+               log.Info("comanage: CoGroup created", "co_group_id", coGroupID)
+       }
+
+       if existing, err := o.c.FindIdentifierOnGroup(coGroupID, "uid"); err != 
nil {
+               o.dlq(ctx, cu, "find_groupname_identifier", err)
+               return err
+       } else if existing == 0 {
+               if _, err := o.c.CreateIdentifierOnGroup(cu.LocalUsername, 
"uid", coGroupID); err != nil {
+                       o.dlq(ctx, cu, "create_groupname_identifier", err)
+                       return err
+               }
+       }
+
+       if existing, err := o.c.FindIdentifierOnGroup(coGroupID, "gidnumber"); 
err != nil {
+               o.dlq(ctx, cu, "find_gidnumber_identifier", err)
+               return err
+       } else if existing == 0 {
+               if _, err := o.c.CreateIdentifierOnGroup(uidnumber, 
"gidnumber", coGroupID); err != nil {
+                       o.dlq(ctx, cu, "create_gidnumber_identifier", err)
+                       return err
+               }
+       }
+
+       if existing, err := o.c.FindCoGroupMember(coGroupID, coPersonID); err 
!= nil {
+               o.dlq(ctx, cu, "find_co_group_member", err)
+               return err
+       } else if existing == 0 {
+               if _, err := o.c.CreateCoGroupMember(coPersonID, coGroupID); 
err != nil {
+                       o.dlq(ctx, cu, "create_co_group_member", err)
+                       return err
+               }
+       }
+
+       // UnixClusterGroup attach: re-attempt is the idempotency mechanism. A 
4xx
+       // here means the pair already exists; continue.
+       if _, err := o.c.CreateUnixClusterGroup(coGroupID); err != nil {
+               var httpErr *client.HTTPError
+               if !errors.As(err, &httpErr) || httpErr.StatusCode < 400 || 
httpErr.StatusCode >= 500 {
+                       o.dlq(ctx, cu, "create_unix_cluster_group", err)
+                       return err
+               }
+               log.Info("comanage: UnixClusterGroup attach returned 4xx 
(already attached)", "status", httpErr.StatusCode)
+       }
+
+       fresh, err := o.c.GetPersonComposite(personID)
+       if err != nil {
+               o.dlq(ctx, cu, "refetch_composite", err)
+               return err
+       }
+       block := UnixClusterAccountBlock{
+               UnixClusterId:    o.c.Config().UnixClusterID,
+               SyncMode:         "M",
+               Status:           "A",
+               Username:         cu.LocalUsername,
+               Uid:              uidInt,
+               Gecos:            "",
+               LoginShell:       o.c.Config().DefaultShell,
+               HomeDirectory:    o.c.Config().HomedirPrefix + cu.LocalUsername,
+               PrimaryCoGroupId: coGroupID,
+       }
+       putBody, err := mergeUnixClusterAccount(fresh, block)
+       if err != nil {
+               o.dlq(ctx, cu, "merge_composite", err)
+               return err
+       }
+       if _, err := o.c.UpdatePerson(personID, putBody); err != nil {
+               o.dlq(ctx, cu, "put_composite", err)
+               return err
+       }
+       log.Info("comanage: UnixClusterAccount attached", "username", 
cu.LocalUsername, "uid", uidInt, "co_group_id", coGroupID)
+       o.audit(ctx, cu, "ComanageClusterAccountAttached", 
fmt.Sprintf("comanage_id=%s username=%s uid=%d", personID, cu.LocalUsername, 
uidInt))
+       return nil
+}
+
+// lookupOrCreateCoPerson resolves the user's CoPerson, returning the COmanage
+// person identifier, the composite (if the GET path was used), and whether a
+// new CoPerson was created.
+func (o *Orchestrator) lookupOrCreateCoPerson(ctx context.Context, user 
*models.User) (string, json.RawMessage, bool, error) {
+       personIDType := o.c.Config().PersonIDType
+
+       if stored, err := o.findStoredPersonID(ctx, user.ID); err != nil {
+               return "", nil, false, fmt.Errorf("stored lookup: %w", err)
+       } else if stored != "" {
+               composite, err := o.c.GetPersonComposite(stored)
+               if err == nil {
+                       return stored, composite, false, nil
+               }
+               if !errors.Is(err, client.ErrNotFound) {
+                       return "", nil, false, fmt.Errorf("get composite for 
stored id: %w", err)
+               }
+               // stored id no longer resolves; fall through to email search
+       }
+
+       if user.Email != "" {
+               coPersonID, err := o.findByEmailExact(user.Email)
+               if err != nil && !errors.Is(err, client.ErrNotFound) {
+                       return "", nil, false, fmt.Errorf("email search: %w", 
err)
+               }
+               if coPersonID != 0 {
+                       composite, err := 
o.c.GetPersonComposite(strconv.Itoa(coPersonID))
+                       if err != nil {
+                               return "", nil, false, fmt.Errorf("get 
composite by numeric id: %w", err)
+                       }
+                       personID, err := extractIdentifier(composite, 
personIDType)
+                       if err != nil {
+                               return "", nil, false, fmt.Errorf("extract %s 
from composite: %w", personIDType, err)
+                       }
+                       if personID == "" {
+                               return "", nil, false, fmt.Errorf("CoPerson %d 
has no %s identifier", coPersonID, personIDType)
+                       }
+                       return personID, composite, false, nil
+               }
+       }
+
+       body, err := buildCreatePersonBody(o.c.Config().COID, user)
+       if err != nil {
+               return "", nil, false, fmt.Errorf("build create body: %w", err)
+       }
+       resp, err := o.c.CreatePerson(body)
+       if err != nil {
+               return "", nil, false, fmt.Errorf("create coperson: %w", err)
+       }
+       for _, r := range resp {
+               if r.Type == personIDType {
+                       return r.Identifier, nil, true, nil
+               }
+       }
+       return "", nil, false, fmt.Errorf("POST /people returned no %s 
identifier: %+v", personIDType, resp)
+}
+
+func buildCreatePersonBody(coID int, user *models.User) ([]byte, error) {
+       body := map[string]interface{}{
+               "CoPerson": map[string]interface{}{
+                       "co_id":  coID,
+                       "status": "A",
+               },
+               "Name": []map[string]interface{}{{
+                       "given":        user.FirstName,
+                       "family":       user.LastName,
+                       "type":         "official",
+                       "primary_name": true,
+                       "language":     "en",
+               }},
+               "EmailAddress": []map[string]interface{}{{
+                       "mail":     user.Email,
+                       "type":     "official",
+                       "verified": false,
+               }},
+       }
+       return json.Marshal(body)
+}
+
+func (o *Orchestrator) audit(ctx context.Context, cu 
*models.ComputeClusterUser, eventType, details string) {
+       _, _ = o.core.CreateAuditEvent(ctx, &models.AuditEvent{
+               EventType: eventType,
+               EntityID:  cu.ID,
+               Details:   details,
+       })
+}
+
+func (o *Orchestrator) dlq(ctx context.Context, cu *models.ComputeClusterUser, 
step string, err error) {
+       details := fmt.Sprintf("step=%s err=%v", step, err)
+       _, _ = o.core.CreateAuditEvent(ctx, &models.AuditEvent{
+               EventType: "ComanageProvisioningFailed",
+               EntityID:  cu.ID,
+               Details:   details,
+       })
+       // TODO: admin endpoint + CLI to re-fire ComputeClusterUserCreateEvent 
for a
+       // specific user, and to clean up orphan CoGroups from terminal 
dead-letters.
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go 
b/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go
new file mode 100644
index 000000000..cb851a28b
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/internal/operations/lookup.go
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package operations
+
+import (
+       "context"
+       "errors"
+       "fmt"
+
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+       "github.com/apache/airavata-custos/pkg/models"
+       _ "github.com/apache/airavata-custos/pkg/service"
+)
+
+var ErrNotFoundShim = client.ErrNotFound
+
+const comanageIdentitySource = "comanage"
+
+func (o *Orchestrator) findStoredPersonID(ctx context.Context, userID string) 
(string, error) {
+       idents, err := o.core.ListUserIdentitiesForUser(ctx, userID)
+       if err != nil {
+               return "", fmt.Errorf("list user identities: %w", err)
+       }
+       for _, id := range idents {
+               if id.Source == comanageIdentitySource && id.ExternalID != "" {
+                       return id.ExternalID, nil
+               }
+       }
+       return "", nil
+}
+
+// storePersonID writes the COmanage CoPerson identifier into user_identities.
+// No-op if a row already exists.
+func (o *Orchestrator) storePersonID(ctx context.Context, userID, personID 
string) error {
+       if existing, err := o.findStoredPersonID(ctx, userID); err != nil {
+               return err
+       } else if existing != "" {
+               return nil
+       }
+       _, err := o.core.CreateUserIdentity(ctx, &models.UserIdentity{
+               UserID:     userID,
+               Source:     comanageIdentitySource,
+               ExternalID: personID,
+       })
+       return err
+}
+
+// findByEmailExact searches by email and returns the first match in the
+// configured CO, or 0 if none. COmanage's search.mail is a LIKE match: callers
+// trading off precision for one round-trip should be aware that very similar
+// emails could collide.
+func (o *Orchestrator) findByEmailExact(email string) (int, error) {
+       if email == "" {
+               return 0, nil
+       }
+       candidates, err := o.c.FindCoPersonByEmail(email)
+       if err != nil {
+               if errors.Is(err, ErrNotFoundShim) {
+                       return 0, nil
+               }
+               return 0, err
+       }
+       for _, p := range candidates {
+               if p.CoId == o.c.Config().COID {
+                       return p.Id, nil
+               }
+       }
+       return 0, nil
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go 
b/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go
new file mode 100644
index 000000000..f48c062dc
--- /dev/null
+++ 
b/connectors/COmanage/Identity-Provisioner/internal/operations/orchestrator.go
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package operations provisions POSIX identity in COmanage for a
+// (CoPerson, UnixCluster) pair.
+package operations
+
+import (
+       "context"
+
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+       "github.com/apache/airavata-custos/pkg/models"
+       "github.com/apache/airavata-custos/pkg/service"
+)
+
+type Orchestrator struct {
+       c    *client.Client
+       core *service.Service
+}
+
+func New(c *client.Client, core *service.Service) *Orchestrator {
+       return &Orchestrator{c: c, core: core}
+}
+
+func (o *Orchestrator) EnsurePOSIXAccount(ctx context.Context, cu 
*models.ComputeClusterUser) error {
+       return o.ensurePOSIXAccountImpl(ctx, cu)
+}
diff --git 
a/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go 
b/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go
new file mode 100644
index 000000000..a3fac2901
--- /dev/null
+++ 
b/connectors/COmanage/Identity-Provisioner/internal/subscribers/cluster_user.go
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package subscribers
+
+import (
+       "context"
+       "log/slog"
+
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/operations"
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/models"
+       "github.com/apache/airavata-custos/pkg/service"
+)
+
+// ClusterUserSubscriber listens for ComputeClusterUserCreateEvent and drives
+// the orchestrator. Events whose ComputeClusterID does not match
+// CustosClusterID are dropped.
+type ClusterUserSubscriber struct {
+       ops             *operations.Orchestrator
+       bus             *events.Bus
+       core            *service.Service
+       custosClusterID string
+}
+
+func NewClusterUserSubscriber(c *client.Client, bus *events.Bus, core 
*service.Service, custosClusterID string) *ClusterUserSubscriber {
+       return &ClusterUserSubscriber{
+               ops:             operations.New(c, core),
+               bus:             bus,
+               core:            core,
+               custosClusterID: custosClusterID,
+       }
+}
+
+func (s *ClusterUserSubscriber) RegisterSubscribers() {
+       s.bus.Subscribe(events.ComputeClusterUserCreateEvent, 
s.handleClusterUserCreate)
+}
+
+func (s *ClusterUserSubscriber) handleClusterUserCreate(_ events.Event, 
payload interface{}) {
+       cu, ok := payload.(*models.ComputeClusterUser)
+       if !ok {
+               slog.Error("comanage subscriber: payload is not 
*ComputeClusterUser", "type", payload)
+               return
+       }
+       if cu.ComputeClusterID != s.custosClusterID {
+               return
+       }
+       ctx := context.Background()
+       // TODO: move to a transactional scope. In-process delivery loses events
+       // if the process crashes between the core commit and subscriber pickup.
+       if err := s.ops.EnsurePOSIXAccount(ctx, cu); err != nil {
+               slog.Error("comanage subscriber: EnsurePOSIXAccount failed",
+                       "compute_cluster_user_id", cu.ID, "user_id", cu.UserID, 
"err", err)
+       }
+}
diff --git a/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go 
b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
new file mode 100644
index 000000000..9c9628dce
--- /dev/null
+++ b/connectors/COmanage/Identity-Provisioner/pkg/comanage/loader.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package comanage is the COmanage Identity-Provisioner entry point. Wired
+// from internal/connectors/loader.go.
+package comanage
+
+import (
+       "context"
+       "log/slog"
+       "os"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/jmoiron/sqlx"
+
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/client"
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/internal/subscribers"
+       "github.com/apache/airavata-custos/pkg/events"
+       "github.com/apache/airavata-custos/pkg/service"
+)
+
+// LoadConnector wires the subscriber to the event bus. If any required env
+// var is missing, the loader logs and returns nil without registering.
+func LoadConnector(_ context.Context, _ *sqlx.DB, eventBus *events.Bus, 
coreService *service.Service, _ *sync.WaitGroup) error {
+       cfg, ok := loadConfigFromEnv()
+       if !ok {
+               slog.Info("comanage provisioner: required env vars not set; 
skipping")
+               return nil
+       }
+       httpClient := client.New(cfg)
+       subscribers.NewClusterUserSubscriber(httpClient, eventBus, coreService, 
cfg.CustosClusterID).RegisterSubscribers()
+       slog.Info("comanage provisioner: subscriber registered",
+               "registry", cfg.RegistryURL, "co_id", cfg.COID, "cluster_id", 
cfg.CustosClusterID)
+       // Custos is the source of truth: CoPerson and UnixClusterAccount 
records
+       // for users provisioned via this connector must not be edited directly 
in
+       // COmanage. There is no drift reconciliation; out-of-band edits will be
+       // invisible to Custos.
+       return nil
+}
+
+func loadConfigFromEnv() (client.Config, bool) {
+       registryURL := os.Getenv("COMANAGE_REGISTRY_URL")
+       coIDStr := os.Getenv("COMANAGE_CO_ID")
+       apiUser := os.Getenv("COMANAGE_API_USER")
+       apiKey := os.Getenv("COMANAGE_API_KEY")
+       personIDType := os.Getenv("COMANAGE_PERSON_ID_TYPE")
+       unixClusterStr := os.Getenv("COMANAGE_UNIX_CLUSTER_ID")
+       custosCluster := os.Getenv("CUSTOS_CLUSTER_ID")
+
+       if registryURL == "" || coIDStr == "" || apiUser == "" || apiKey == "" 
|| personIDType == "" || unixClusterStr == "" || custosCluster == "" {
+               return client.Config{}, false
+       }
+       coID, err := strconv.Atoi(coIDStr)
+       if err != nil {
+               slog.Error("comanage provisioner: COMANAGE_CO_ID is not an 
int", "value", coIDStr, "err", err)
+               return client.Config{}, false
+       }
+       unixClusterID, err := strconv.Atoi(unixClusterStr)
+       if err != nil {
+               slog.Error("comanage provisioner: COMANAGE_UNIX_CLUSTER_ID is 
not an int", "value", unixClusterStr, "err", err)
+               return client.Config{}, false
+       }
+
+       timeout := 30 * time.Second
+       if v := os.Getenv("COMANAGE_HTTP_TIMEOUT"); v != "" {
+               if d, err := time.ParseDuration(v); err == nil {
+                       timeout = d
+               }
+       }
+
+       defaultShell := os.Getenv("COMANAGE_DEFAULT_SHELL")
+       if defaultShell == "" {
+               defaultShell = "/bin/bash"
+       }
+       homedirPrefix := os.Getenv("COMANAGE_HOMEDIR_PREFIX")
+       if homedirPrefix == "" {
+               homedirPrefix = "/home/"
+       }
+
+       return client.Config{
+               RegistryURL:     registryURL,
+               COID:            coID,
+               APIUser:         apiUser,
+               APIKey:          apiKey,
+               PersonIDType:    personIDType,
+               UnixClusterID:   unixClusterID,
+               CustosClusterID: custosCluster,
+               DefaultShell:    defaultShell,
+               HomedirPrefix:   homedirPrefix,
+               HTTPTimeout:     timeout,
+       }, true
+}
diff --git a/internal/connectors/loader.go b/internal/connectors/loader.go
index 99fb221c0..ea070208c 100644
--- a/internal/connectors/loader.go
+++ b/internal/connectors/loader.go
@@ -25,6 +25,7 @@ import (
        "github.com/jmoiron/sqlx"
 
        
"github.com/apache/airavata-custos/connectors/ACCESS/AMIE-Processor/pkg/amie"
+       
"github.com/apache/airavata-custos/connectors/COmanage/Identity-Provisioner/pkg/comanage"
        
"github.com/apache/airavata-custos/connectors/SLURM/Association-Mapper/pkg/smapper"
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/service"
@@ -45,6 +46,12 @@ func LoadConnectors(ctx context.Context, database *sqlx.DB, 
eventBus *events.Bus
                return err
        }
 
+       slog.Info("loading COmanage Identity-Provisioner connector")
+       if err := comanage.LoadConnector(ctx, database, eventBus, coreService, 
wg); err != nil {
+               slog.Error("failed to load COmanage Identity-Provisioner 
connector", "error", err)
+               return err
+       }
+
        slog.Info("finished loading connectors")
        return nil
 }
diff --git a/internal/db/migrations/000002_compute_clusters.up.sql 
b/internal/db/migrations/000002_compute_clusters.up.sql
index f0c268482..c150758db 100644
--- a/internal/db/migrations/000002_compute_clusters.up.sql
+++ b/internal/db/migrations/000002_compute_clusters.up.sql
@@ -38,6 +38,7 @@ CREATE TABLE IF NOT EXISTS compute_cluster_users
     updated_at         TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON 
UPDATE CURRENT_TIMESTAMP(6),
     PRIMARY KEY (id),
     UNIQUE KEY uq_compute_cluster_users_pair (compute_cluster_id, user_id),
+    UNIQUE KEY uq_compute_cluster_users_local_username (compute_cluster_id, 
local_username),
     KEY idx_compute_cluster_users_user (user_id),
     CONSTRAINT fk_compute_cluster_users_cluster FOREIGN KEY 
(compute_cluster_id) REFERENCES compute_clusters (id) ON DELETE CASCADE,
     CONSTRAINT fk_compute_cluster_users_user FOREIGN KEY (user_id) REFERENCES 
users (id) ON DELETE CASCADE
diff --git a/pkg/posix/username.go b/pkg/posix/username.go
new file mode 100644
index 000000000..e5fbd34c9
--- /dev/null
+++ b/pkg/posix/username.go
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package posix builds and validates POSIX-conformant usernames for HPC
+// account provisioning.
+package posix
+
+import (
+       "errors"
+       "fmt"
+       "os"
+       "strings"
+       "unicode"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+const MaxCollisionSuffix = 999
+
+var ErrUnbuildableUsername = errors.New("posix: cannot build username from 
empty first and last name")
+
+// BuildBase returns the unsuffixed username. 'truncated' is set when the name
+// portion was shortened to fit the 32-char POSIX login cap.
+func BuildBase(u *models.User, prefix string) (string, bool, error) {
+       first := Normalize(u.FirstName)
+       last := Normalize(u.LastName)
+
+       var local string
+       switch {
+       case first != "" && last != "":
+               local = string(first[0]) + last
+       case last != "":
+               local = last
+       case first != "":
+               local = first
+       default:
+               return "", false, fmt.Errorf("%w: user %q (first=%q last=%q)", 
ErrUnbuildableUsername, u.ID, u.FirstName, u.LastName)
+       }
+
+       // 32 = POSIX login cap; -1 separator, -3 reserved for collision suffix 
(up to "999").
+       maxLocal := 32 - len(prefix) - 1 - 3
+       truncated := false
+       if len(local) > maxLocal {
+               local = local[:maxLocal]
+               truncated = true
+       }
+       return prefix + "-" + local, truncated, nil
+}
+
+func Normalize(s string) string {
+       s = strings.ToLower(s)
+       var b strings.Builder
+       for _, r := range s {
+               if r > unicode.MaxASCII {
+                       continue
+               }
+               if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') {
+                       b.WriteRune(r)
+               }
+       }
+       return b.String()
+}
+
+func Prefix() string {
+       if v := os.Getenv("POSIX_USERNAME_PREFIX"); v != "" {
+               return v
+       }
+       return "custos"
+}
diff --git a/pkg/posix/username_test.go b/pkg/posix/username_test.go
new file mode 100644
index 000000000..57b4f28f4
--- /dev/null
+++ b/pkg/posix/username_test.go
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package posix
+
+import (
+       "errors"
+       "strings"
+       "testing"
+
+       "github.com/apache/airavata-custos/pkg/models"
+)
+
+func TestBuildBase(t *testing.T) {
+       tests := []struct {
+               name      string
+               first     string
+               middle    string
+               last      string
+               prefix    string
+               wantBase  string
+               wantTrunc bool
+       }{
+               {
+                       name:  "standard",
+                       first: "Alice", last: "Smith", prefix: "custos",
+                       wantBase: "custos-asmith", wantTrunc: false,
+               },
+               {
+                       name:  "middle ignored",
+                       first: "Alice", middle: "Marie", last: "Smith", prefix: 
"custos",
+                       wantBase: "custos-asmith", wantTrunc: false,
+               },
+               {
+                       name:  "single name as last",
+                       first: "", last: "Madonna", prefix: "custos",
+                       wantBase: "custos-madonna", wantTrunc: false,
+               },
+               {
+                       name:  "first only",
+                       first: "Alice", last: "", prefix: "custos",
+                       wantBase: "custos-alice", wantTrunc: false,
+               },
+               {
+                       name:  "non-ASCII stripped",
+                       first: "Aña", last: "Şəkili", prefix: "custos",
+                       wantBase: "custos-akili", wantTrunc: false,
+               },
+               {
+                       name:  "prefix swap",
+                       first: "Alice", last: "Smith", prefix: "nexus",
+                       wantBase: "nexus-asmith", wantTrunc: false,
+               },
+               {
+                       name:  "truncation at 32-char limit",
+                       first: "L", last: strings.Repeat("a", 50), prefix: 
"custos",
+                       wantTrunc: true,
+               },
+       }
+
+       for _, tc := range tests {
+               t.Run(tc.name, func(t *testing.T) {
+                       u := &models.User{FirstName: tc.first, MiddleName: 
tc.middle, LastName: tc.last}
+                       got, trunc, err := BuildBase(u, tc.prefix)
+                       if err != nil {
+                               t.Fatalf("BuildBase: %v", err)
+                       }
+
+                       if trunc != tc.wantTrunc {
+                               t.Errorf("truncated = %v, want %v", trunc, 
tc.wantTrunc)
+                       }
+                       if tc.wantBase != "" && got != tc.wantBase {
+                               t.Errorf("base = %q, want %q", got, tc.wantBase)
+                       }
+                       if len(got) > 32 {
+                               t.Errorf("base len %d > 32: %q", len(got), got)
+                       }
+                       if !strings.HasPrefix(got, tc.prefix+"-") {
+                               t.Errorf("base %q does not start with prefix 
%q", got, tc.prefix+"-")
+                       }
+               })
+       }
+}
+
+func TestBuildBase_UnbuildableReturnsError(t *testing.T) {
+       cases := []struct {
+               name  string
+               first string
+               last  string
+       }{
+               {"both empty", "", ""},
+               {"both non-ASCII only", "李", "王"},
+               {"both punctuation only", "...", "---"},
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       u := &models.User{ID: "u-1", FirstName: tc.first, 
LastName: tc.last}
+                       got, _, err := BuildBase(u, "custos")
+                       if !errors.Is(err, ErrUnbuildableUsername) {
+                               t.Fatalf("err = %v, want 
ErrUnbuildableUsername", err)
+                       }
+                       if got != "" {
+                               t.Errorf("expected empty username on error, got 
%q", got)
+                       }
+               })
+       }
+}
+
+func TestNormalize(t *testing.T) {
+       tests := []struct {
+               in   string
+               want string
+       }{
+               {"Alice", "alice"},
+               {"ALLCAPS", "allcaps"},
+               {"abc123", "abc123"},
+               {"Aña", "aa"},
+               {"Şəkili", "kili"},
+               {"hello-world", "helloworld"},
+               {"O'Brien", "obrien"},
+               {"", ""},
+       }
+       for _, tc := range tests {
+               t.Run(tc.in, func(t *testing.T) {
+                       got := Normalize(tc.in)
+                       if got != tc.want {
+                               t.Errorf("Normalize(%q) = %q, want %q", tc.in, 
got, tc.want)
+                       }
+               })
+       }
+}
+
+func TestPrefix(t *testing.T) {
+       t.Setenv("POSIX_USERNAME_PREFIX", "")
+       if got := Prefix(); got != "custos" {
+               t.Errorf("default = %q, want %q", got, "custos")
+       }
+       t.Setenv("POSIX_USERNAME_PREFIX", "nexus")
+       if got := Prefix(); got != "nexus" {
+               t.Errorf("override = %q, want %q", got, "nexus")
+       }
+}
diff --git a/pkg/service/compute_cluster_user.go 
b/pkg/service/compute_cluster_user.go
index b8311bbf4..0aeca1f27 100644
--- a/pkg/service/compute_cluster_user.go
+++ b/pkg/service/compute_cluster_user.go
@@ -21,6 +21,7 @@ import (
        "context"
        "database/sql"
        "fmt"
+       "strings"
 
        "github.com/apache/airavata-custos/pkg/events"
        "github.com/apache/airavata-custos/pkg/models"
@@ -67,13 +68,29 @@ func (s *Service) CreateComputeClusterUser(ctx 
context.Context, cu *models.Compu
        if err := s.inTx(ctx, func(tx *sql.Tx) error {
                return s.clusterUsers.Create(ctx, tx, cu)
        }); err != nil {
-               return nil, fmt.Errorf("create compute cluster user: %w", err)
+               switch {
+               case isLocalUsernameDuplicate(err):
+                       return nil, fmt.Errorf("%w: %s", ErrAlreadyExists, 
cu.LocalUsername)
+               case isPairDuplicate(err):
+                       return nil, fmt.Errorf("%w: user %q is already mapped 
on cluster %q",
+                               ErrAlreadyExists, cu.UserID, 
cu.ComputeClusterID)
+               default:
+                       return nil, fmt.Errorf("create compute cluster user: 
%w", err)
+               }
        }
 
        s.eventBus.Publish(events.ComputeClusterUserCreateEvent, cu)
        return cu, nil
 }
 
+func isLocalUsernameDuplicate(err error) bool {
+       return err != nil && strings.Contains(err.Error(), 
"uq_compute_cluster_users_local_username")
+}
+
+func isPairDuplicate(err error) bool {
+       return err != nil && strings.Contains(err.Error(), 
"uq_compute_cluster_users_pair")
+}
+
 // GetComputeClusterUser retrieves a compute-cluster user by its ID.
 func (s *Service) GetComputeClusterUser(ctx context.Context, id string) 
(*models.ComputeClusterUser, error) {
        c, err := s.clusterUsers.FindByID(ctx, id)

Reply via email to