This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5629-3dab771a2fe3ea5bf97c4c69cfbd761f9cd01e54
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 2cdb1fe20c2fef594e0379e31b349fb4f2899475
Author: ali risheh <[email protected]>
AuthorDate: Mon Jun 15 14:59:46 2026 -0700

    fix(access-control-service): include port in computing unit pod URI and use 
Envoy Gateway for distributed CUs (#5629)
    
    ### What changes were proposed in this PR?
    
    Make the in-cluster address of a computing unit come from a single
    source of truth — the URI recorded when its pod is created — and ensure
    that URI is complete (includes the port). This lets the gateway route a
    user to a computing unit located **anywhere it can reach** (in the local
    cluster, another cluster, or an external host), instead of being limited
    to a reconstructed in-cluster address. See #5630.
    
    Two related changes:
    
    **1. Include the port in the generated pod URI**
    (`computing-unit-managing-service`)
    
    `KubernetesClient.generatePodURI` builds the address stored as the
    computing unit's `uri` (via `setUri` in `ComputingUnitManagingResource`)
    and returned to clients as `nodeAddresses`. The pod's container listens
    on `KubernetesConfig.computeUnitPortNumber` (declared with
    `withContainerPort(...)` in the same file), but the generated URI
    omitted the port, so the persisted address was not directly connectable.
    The port is now appended:
    
    ```scala
    s"...svc.cluster.local:${KubernetesConfig.computeUnitPortNumber}"
    ```
    
    **2. Route using the recorded URI** (`access-control-service`)
    
    `AccessControlResource` rebuilt the computing unit's address from
    `KubernetesConfig` on every authorization request, duplicating the
    construction logic in `generatePodURI` and pinning every CU to the local
    cluster. It now reads the URI recorded for the unit and returns it as
    the `Host` for the gateway to route to. If no URI has been recorded, the
    unit is not routable and the request is **refused with `403`** (no
    in-cluster fallback, per review).
    
    ### Routing flow
    
    The access-control service is the gateway's external authorizer; the
    `Host` it returns is the upstream Envoy forwards the (upgraded)
    connection to. Because that host comes from the unit's recorded URI, the
    same gateway can reach computing units in different locations:
    
    ```mermaid
    flowchart LR
        FE["Frontend<br/>(/wsapi?cuid=N)"] --> GW["Envoy Gateway"]
        GW -. "ext-auth: authorize + get Host" .-> ACS["access-control-service"]
        ACS -- "read recorded uri for CU N" --> DB[("workflow_computing_unit")]
        ACS -- "Host = recorded uri<br/>(or 403 if none)" --> GW
        GW == "dynamic forward proxy<br/>to returned Host" ==> R{Where the CU 
lives}
        R --> CU1["In-cluster CU 
pod<br/>computing-unit-N...svc.cluster.local:port"]
        R --> CU2["CU in another cluster"]
        R --> CU3["External / remote CU host:port"]
    ```
    
    ### Any related issues, documentation, discussions?
    
    - Closes #5630.
    - Builds on the Envoy Gateway / ext-auth routing introduced in #4191
    (unified Envoy Gateway) and #3598 (access-control-service as the
    ext-auth service for computing-unit traffic).
    
    ### How was this PR tested?
    On live deployment.
    <img width="1835" height="960" alt="Screenshot from 2026-06-13 13-31-00"
    
src="https://github.com/user-attachments/assets/d56a48f9-b99d-4d36-827a-0a4ce54995fd";
    />
    
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Claude Opus 4.8)
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../service/resource/AccessControlResource.scala   | 29 +++++++---
 .../apache/texera/AccessControlResourceSpec.scala  | 65 ++++++++++++++++++++--
 .../texera/service/util/KubernetesClient.scala     |  2 +-
 3 files changed, 82 insertions(+), 14 deletions(-)

diff --git 
a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala
 
b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala
index 4f1691287f..792a0dfd8a 100644
--- 
a/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala
+++ 
b/access-control-service/src/main/scala/org/apache/texera/service/resource/AccessControlResource.scala
@@ -27,8 +27,10 @@ import jakarta.ws.rs.{Consumes, DELETE, GET, POST, PUT, 
Path, Produces}
 import org.apache.texera.auth.JwtParser.parseToken
 import org.apache.texera.auth.SessionUser
 import org.apache.texera.auth.util.{ComputingUnitAccess, HeaderField}
-import org.apache.texera.common.config.{GuiConfig, KubernetesConfig, LLMConfig}
+import org.apache.texera.common.config.{GuiConfig, LLMConfig}
+import org.apache.texera.dao.SqlServer
 import org.apache.texera.dao.jooq.generated.enums.PrivilegeEnum
+import 
org.apache.texera.dao.jooq.generated.tables.daos.WorkflowComputingUnitDao
 
 import java.net.URLDecoder
 import java.nio.charset.StandardCharsets
@@ -136,12 +138,25 @@ object AccessControlResource extends LazyLogging {
     }
 
     // Dynamic Routing Logic
-    val workflowComputingUnitPoolName = KubernetesConfig.computeUnitPoolName
-    val workflowComputingUnitPoolNamespace = 
KubernetesConfig.computeUnitPoolNamespace
-    val workflowComputingUnitPoolPort = KubernetesConfig.computeUnitPortNumber
-
-    val targetHost =
-      
s"computing-unit-$cuidInt.$workflowComputingUnitPoolName-svc.$workflowComputingUnitPoolNamespace.svc.cluster.local:$workflowComputingUnitPoolPort"
+    // Route to the URI recorded for the computing unit (written by the 
managing
+    // service when the pod is created). This recorded URI is the single source
+    // of truth for where the unit is reachable, allowing units to live 
anywhere
+    // the gateway can route to. If no URI has been recorded, the unit is not
+    // routable and the connection is refused.
+    val cuDao = new WorkflowComputingUnitDao(
+      SqlServer.getInstance().createDSLContext().configuration()
+    )
+    val unit = cuDao.fetchOneByCuid(cuidInt)
+    val recordedUri = Option(unit).flatMap(u => 
Option(u.getUri)).map(_.trim).filter(_.nonEmpty)
+
+    val targetHost = recordedUri match {
+      case Some(uri) =>
+        logger.info(s"Routing CU $cuidInt to recorded host: $uri")
+        uri
+      case None =>
+        logger.warn(s"Refusing CU $cuidInt: no URI recorded for the computing 
unit")
+        return Response.status(Response.Status.FORBIDDEN).build()
+    }
 
     Response
       .ok()
diff --git 
a/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala
 
b/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala
index 3dfe81d89d..365f5f885f 100644
--- 
a/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala
+++ 
b/access-control-service/src/test/scala/org/apache/texera/AccessControlResourceSpec.scala
@@ -55,6 +55,11 @@ class AccessControlResourceSpec
   private val testURI: String = "http://localhost:8080/";
   private val testPath: String = "/api/executions/1/stats/1"
 
+  // The host:port the managing service records for a computing unit when it
+  // creates the pod. The access-control-service routes to this recorded URI.
+  private val testRecordedUri: String =
+    "computing-unit-2.compute-unit-svc.default.svc.cluster.local:8888"
+
   private val testUser1: User = {
     val user = new User()
     user.setUid(1)
@@ -81,6 +86,31 @@ class AccessControlResourceSpec
     cu.setType(WorkflowComputingUnitTypeEnum.kubernetes)
     cu.setCuid(2)
     cu.setName("test-cu")
+    cu.setUri(testRecordedUri)
+    cu
+  }
+
+  // A computing unit the user can access but for which no URI was ever 
recorded
+  // (e.g. the pod was never created). Such a unit is not routable and must be
+  // refused.
+  private val testCUNoUri: WorkflowComputingUnit = {
+    val cu = new WorkflowComputingUnit()
+    cu.setUid(2)
+    cu.setType(WorkflowComputingUnitTypeEnum.kubernetes)
+    cu.setCuid(3)
+    cu.setName("test-cu-no-uri")
+    cu
+  }
+
+  // A computing unit whose recorded URI is blank/whitespace-only — also 
treated
+  // as "no URI recorded" and refused.
+  private val testCUBlankUri: WorkflowComputingUnit = {
+    val cu = new WorkflowComputingUnit()
+    cu.setUid(2)
+    cu.setType(WorkflowComputingUnitTypeEnum.kubernetes)
+    cu.setCuid(4)
+    cu.setName("test-cu-blank-uri")
+    cu.setUri("   ")
     cu
   }
 
@@ -96,12 +126,18 @@ class AccessControlResourceSpec
     userDao.insert(testUser1)
     userDao.insert(testUser2)
     computingUnitDao.insert(testCU)
-
-    val cuAccess = new ComputingUnitUserAccess()
-    cuAccess.setUid(testUser1.getUid)
-    cuAccess.setCuid(testCU.getCuid)
-    cuAccess.setPrivilege(PrivilegeEnum.WRITE)
-    computingUnitOfUserDao.insert(cuAccess)
+    computingUnitDao.insert(testCUNoUri)
+    computingUnitDao.insert(testCUBlankUri)
+
+    // Grant testUser1 WRITE access to every test computing unit so the routing
+    // logic (not the access check) is what each routing test exercises.
+    Seq(testCU, testCUNoUri, testCUBlankUri).foreach { cu =>
+      val cuAccess = new ComputingUnitUserAccess()
+      cuAccess.setUid(testUser1.getUid)
+      cuAccess.setCuid(cu.getCuid)
+      cuAccess.setPrivilege(PrivilegeEnum.WRITE)
+      computingUnitOfUserDao.insert(cuAccess)
+    }
 
     val claims = JwtAuth.jwtClaims(testUser1, 1)
     token = JwtAuth.jwtToken(claims)
@@ -232,6 +268,23 @@ class AccessControlResourceSpec
     response.getHeaderString(HeaderField.UserId) shouldBe 
testUser1.getUid.toString
     response.getHeaderString(HeaderField.UserName) shouldBe testUser1.getName
     response.getHeaderString(HeaderField.UserEmail) shouldBe testUser1.getEmail
+    // Envoy routes by the rewritten Host header, which must be the URI 
recorded
+    // for the computing unit.
+    response.getHeaderString("Host") shouldBe testRecordedUri
+  }
+
+  it should "refuse the connection when no URI is recorded for the computing 
unit" in {
+    val (uri, headers) = mockRequest(testPath, 
Some(testCUNoUri.getCuid.toString))
+    val response = new AccessControlResource().authorizeGet(uri, headers)
+
+    response.getStatus shouldBe Response.Status.FORBIDDEN.getStatusCode
+  }
+
+  it should "refuse the connection when the recorded URI is blank" in {
+    val (uri, headers) = mockRequest(testPath, 
Some(testCUBlankUri.getCuid.toString))
+    val response = new AccessControlResource().authorizeGet(uri, headers)
+
+    response.getStatus shouldBe Response.Status.FORBIDDEN.getStatusCode
   }
 
   private def mockRequest(
diff --git 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala
 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala
index 5177ebaf47..4f1d391cb3 100644
--- 
a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala
+++ 
b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/util/KubernetesClient.scala
@@ -35,7 +35,7 @@ object KubernetesClient {
   private val podNamePrefix = "computing-unit"
 
   def generatePodURI(cuid: Int): String = {
-    
s"${generatePodName(cuid)}.${KubernetesConfig.computeUnitServiceName}.$namespace.svc.cluster.local"
+    
s"${generatePodName(cuid)}.${KubernetesConfig.computeUnitServiceName}.$namespace.svc.cluster.local:${KubernetesConfig.computeUnitPortNumber}"
   }
 
   def generatePodName(cuid: Int): String = s"$podNamePrefix-$cuid"

Reply via email to