This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-5205-9b850e2ed8927a28cf4842d8534d4cb3f0ab01b7
in repository https://gitbox.apache.org/repos/asf/texera.git

commit 79a1d170b7bcf803f606f81fdc93d15bbc64fb99
Author: Yicong Huang <[email protected]>
AuthorDate: Mon May 25 16:27:30 2026 -0700

    test(amber): cover WorkflowExecutionsResource static helpers (#5205)
    
    ### What changes were proposed in this PR?
    
    Extends `WorkflowExecutionsResourceSpec` to cover the companion-object
    helpers that the JAX-RS endpoints lean on. The existing spec covered two
    of them; this PR adds cases for status-filtered execution listing,
    latest-execution lookup by `(wid, cuid)`, the expired-execution scan,
    `insertOperatorExecutions`, `updateRuntimeStatsUri` (including the
    no-match branch), the per-eid URI fetchers
    (`getResultUrisByExecutionId`, `getConsoleMessagesUriByExecutionId`,
    `getRuntimeStatsUriByExecutionId`) with their null/empty filtering,
    `deleteConsoleMessageAndExecutionResultUris`, the DB-delete branch of
    `removeAllExecutionFiles`, `updateResultSize`, the VFS-decoding logic in
    `getResultUriByLogicalPortId`, and the private BFS-based
    `getNonDownloadableOperatorMap` (via `PrivateMethodTester`).
    
    Cleanup is extended to `OPERATOR_PORT_EXECUTIONS`,
    `OPERATOR_EXECUTIONS`, `DATASET`, and `WORKFLOW_COMPUTING_UNIT` so
    seeded rows don't leak between cases.
    
    No production code is touched.
    
    ### Any related issues, documentation, discussions?
    
    Closes #5204.
    
    ### How was this PR tested?
    
    Added unit tests for the helpers listed above.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Opus 4.7
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../workflow/WorkflowExecutionsResourceSpec.scala  | 583 ++++++++++++++++++++-
 1 file changed, 555 insertions(+), 28 deletions(-)

diff --git 
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
 
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
index bd55124a72..03adc82c96 100644
--- 
a/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
+++ 
b/amber/src/test/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResourceSpec.scala
@@ -19,24 +19,31 @@
 
 package org.apache.texera.web.resource.dashboard.user.workflow
 
+import org.apache.texera.amber.core.storage.{VFSResourceType, VFSURIFactory}
 import org.apache.texera.amber.core.virtualidentity.{
   ExecutionIdentity,
   OperatorIdentity,
-  PhysicalOpIdentity
+  PhysicalOpIdentity,
+  WorkflowIdentity
 }
 import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, PortIdentity}
 import org.apache.texera.amber.util.serde.GlobalPortIdentitySerde.SerdeOps
 import org.apache.texera.dao.MockTexeraDB
 import org.apache.texera.dao.jooq.generated.Tables._
+import org.apache.texera.dao.jooq.generated.enums.WorkflowComputingUnitTypeEnum
 import org.apache.texera.dao.jooq.generated.tables.daos.{
+  DatasetDao,
   UserDao,
+  WorkflowComputingUnitDao,
   WorkflowDao,
   WorkflowExecutionsDao,
   WorkflowVersionDao
 }
 import org.apache.texera.dao.jooq.generated.tables.pojos.{
+  Dataset,
   User,
   Workflow,
+  WorkflowComputingUnit,
   WorkflowExecutions,
   WorkflowVersion
 }
@@ -66,6 +73,8 @@ class WorkflowExecutionsResourceSpec
   private var workflowDao: WorkflowDao = _
   private var workflowVersionDao: WorkflowVersionDao = _
   private var workflowExecutionsDao: WorkflowExecutionsDao = _
+  private var datasetDao: DatasetDao = _
+  private var computingUnitDao: WorkflowComputingUnitDao = _
 
   override protected def beforeAll(): Unit = {
     initializeDBAndReplaceDSLContext()
@@ -96,6 +105,8 @@ class WorkflowExecutionsResourceSpec
     workflowVersionDao = new WorkflowVersionDao(getDSLContext.configuration())
     userDao = new UserDao(getDSLContext.configuration())
     workflowExecutionsDao = new 
WorkflowExecutionsDao(getDSLContext.configuration())
+    datasetDao = new DatasetDao(getDSLContext.configuration())
+    computingUnitDao = new 
WorkflowComputingUnitDao(getDSLContext.configuration())
 
     cleanupTestData()
 
@@ -109,18 +120,41 @@ class WorkflowExecutionsResourceSpec
   }
 
   private def cleanupTestData(): Unit = {
+    val vidSubquery = getDSLContext
+      .select(WORKFLOW_VERSION.VID)
+      .from(WORKFLOW_VERSION)
+      .where(WORKFLOW_VERSION.WID.eq(testWorkflowWid))
+
+    // Child tables of WORKFLOW_EXECUTIONS must be wiped before the parent row.
     getDSLContext
-      .deleteFrom(WORKFLOW_EXECUTIONS)
+      .deleteFrom(OPERATOR_PORT_EXECUTIONS)
+      .where(
+        OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.in(
+          getDSLContext
+            .select(WORKFLOW_EXECUTIONS.EID)
+            .from(WORKFLOW_EXECUTIONS)
+            .where(WORKFLOW_EXECUTIONS.VID.in(vidSubquery))
+        )
+      )
+      .execute()
+
+    getDSLContext
+      .deleteFrom(OPERATOR_EXECUTIONS)
       .where(
-        WORKFLOW_EXECUTIONS.VID.in(
+        OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.in(
           getDSLContext
-            .select(WORKFLOW_VERSION.VID)
-            .from(WORKFLOW_VERSION)
-            .where(WORKFLOW_VERSION.WID.eq(testWorkflowWid))
+            .select(WORKFLOW_EXECUTIONS.EID)
+            .from(WORKFLOW_EXECUTIONS)
+            .where(WORKFLOW_EXECUTIONS.VID.in(vidSubquery))
         )
       )
       .execute()
 
+    getDSLContext
+      .deleteFrom(WORKFLOW_EXECUTIONS)
+      .where(WORKFLOW_EXECUTIONS.VID.in(vidSubquery))
+      .execute()
+
     getDSLContext
       .deleteFrom(WORKFLOW_VERSION)
       .where(WORKFLOW_VERSION.WID.eq(testWorkflowWid))
@@ -131,6 +165,17 @@ class WorkflowExecutionsResourceSpec
       .where(WORKFLOW.WID.eq(testWorkflowWid))
       .execute()
 
+    // Datasets / computing units / extra users may be seeded by individual 
cases.
+    getDSLContext
+      .deleteFrom(DATASET)
+      
.where(DATASET.OWNER_UID.in(getDSLContext.select(USER.UID).from(USER).where(USER.UID.ne(0))))
+      .execute()
+
+    getDSLContext
+      .deleteFrom(WORKFLOW_COMPUTING_UNIT)
+      .where(WORKFLOW_COMPUTING_UNIT.UID.eq(testUserId))
+      .execute()
+
     getDSLContext
       .deleteFrom(USER)
       .where(USER.UID.eq(testUserId))
@@ -141,24 +186,59 @@ class WorkflowExecutionsResourceSpec
     shutdownDB()
   }
 
+  // ─── helpers ──────────────────────────────────────────────────────────────
+
+  private def insertComputingUnit(): WorkflowComputingUnit = {
+    val unit = new WorkflowComputingUnit
+    unit.setUid(testUser.getUid)
+    unit.setName("test-unit-" + UUID.randomUUID().toString.substring(0, 8))
+    unit.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    unit.setType(WorkflowComputingUnitTypeEnum.local)
+    unit.setUri("local://test")
+    unit.setResource("{}")
+    computingUnitDao.insert(unit)
+    unit
+  }
+
+  private def insertExecution(
+      name: String = s"Execution-${UUID.randomUUID().toString.substring(0, 
8)}",
+      status: Byte = 0.toByte,
+      result: String = "",
+      logLocation: String = "",
+      startOffsetMillis: Long = 0L,
+      lastUpdateOffsetMillis: Option[Long] = None,
+      cuid: Integer = null,
+      runtimeStatsUri: String = null
+  ): WorkflowExecutions = {
+    val execution = new WorkflowExecutions
+    execution.setVid(testVersion.getVid)
+    execution.setUid(testUser.getUid)
+    execution.setStatus(status)
+    execution.setResult(result)
+    execution.setLogLocation(logLocation)
+    val now = System.currentTimeMillis()
+    execution.setStartingTime(new Timestamp(now - startOffsetMillis))
+    lastUpdateOffsetMillis.foreach(off => execution.setLastUpdateTime(new 
Timestamp(now - off)))
+    execution.setBookmarked(false)
+    execution.setName(name)
+    execution.setEnvironmentVersion("test-env-1.0")
+    execution.setCuid(cuid)
+    execution.setRuntimeStatsUri(runtimeStatsUri)
+    workflowExecutionsDao.insert(execution)
+    execution
+  }
+
+  // ─── existing tests (preserved) ───────────────────────────────────────────
+
   "WorkflowExecutionsResource.getWorkflowExecutions" should "return executions 
with EIDs in descending order" in {
     val numExecutions = 10
     val executionIds = ArrayBuffer.empty[Integer]
 
     for (i <- 1 to numExecutions) {
-      val execution = new WorkflowExecutions
-      execution.setVid(testVersion.getVid)
-      execution.setUid(testUser.getUid)
-      execution.setStatus(0.toByte)
-      execution.setResult("")
-      execution.setStartingTime(
-        new Timestamp(System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(numExecutions - i))
+      val execution = insertExecution(
+        name = s"Execution ${i}",
+        startOffsetMillis = TimeUnit.DAYS.toMillis(numExecutions - i)
       )
-      execution.setBookmarked(false)
-      execution.setName(s"Execution ${i}")
-      execution.setEnvironmentVersion("test-env-1.0")
-
-      workflowExecutionsDao.insert(execution)
       executionIds.append(execution.getEid)
     }
 
@@ -185,16 +265,7 @@ class WorkflowExecutionsResourceSpec
   }
 
   "WorkflowExecutionsResource.insertOperatorPortResultUri" should "insert a 
result URI row" in {
-    val execution = new WorkflowExecutions
-    execution.setVid(testVersion.getVid)
-    execution.setUid(testUser.getUid)
-    execution.setStatus(0.toByte)
-    execution.setResult("")
-    execution.setStartingTime(new Timestamp(System.currentTimeMillis()))
-    execution.setBookmarked(false)
-    execution.setName("Execution with duplicate result URI insert")
-    execution.setEnvironmentVersion("test-env-1.0")
-    workflowExecutionsDao.insert(execution)
+    val execution = insertExecution(name = "Execution with duplicate result 
URI insert")
 
     val executionId = ExecutionIdentity(execution.getEid.longValue())
     val globalPortId = GlobalPortIdentity(
@@ -216,4 +287,460 @@ class WorkflowExecutionsResourceSpec
     assert(rows.get(0).getResultUri == uri.toString)
   }
 
+  // ─── new: status-filtered execution listing ───────────────────────────────
+
+  "getWorkflowExecutions with statusCodes" should "narrow results to the 
requested codes" in {
+    insertExecution(status = 1.toByte)
+    insertExecution(status = 2.toByte)
+    insertExecution(status = 1.toByte)
+
+    val onlyStatusOne =
+      WorkflowExecutionsResource.getWorkflowExecutions(
+        testWorkflowWid,
+        getDSLContext,
+        Set(1.toByte)
+      )
+    assert(onlyStatusOne.size == 2)
+    assert(onlyStatusOne.forall(_.status == 1.toByte))
+  }
+
+  // ─── new: getLatestExecutionID ────────────────────────────────────────────
+
+  "getLatestExecutionID" should "return None when no executions exist for the 
(wid, cuid) pair" in {
+    val result =
+      WorkflowExecutionsResource.getLatestExecutionID(testWorkflowWid, 
Integer.valueOf(999))
+    assert(result.isEmpty)
+  }
+
+  it should "return the largest EID for matching (wid, cuid)" in {
+    // cuid has an FK to WORKFLOW_COMPUTING_UNIT — seed two units.
+    val unitA = insertComputingUnit()
+    val unitB = insertComputingUnit()
+
+    val a = insertExecution(cuid = unitA.getCuid)
+    val b = insertExecution(cuid = unitA.getCuid)
+    // Distractor with a different cuid — should be ignored.
+    insertExecution(cuid = unitB.getCuid)
+
+    val result = 
WorkflowExecutionsResource.getLatestExecutionID(testWorkflowWid, unitA.getCuid)
+    assert(result.isDefined)
+    assert(result.get == math.max(a.getEid, b.getEid))
+  }
+
+  // ─── new: getExpiredExecutionsWithResultOrLog ─────────────────────────────
+
+  "getExpiredExecutionsWithResultOrLog" should "match rows that are stale by 
starting_time and have a result" in {
+    // Stale-by-starting-time + has result → match.
+    val expired = insertExecution(
+      name = "expired-with-result",
+      result = "some-result",
+      startOffsetMillis = TimeUnit.SECONDS.toMillis(120)
+    )
+    // Fresh starting_time → must not match.
+    insertExecution(name = "fresh", result = "some-result")
+    // Stale but empty result+log → must not match.
+    insertExecution(
+      name = "stale-but-empty",
+      startOffsetMillis = TimeUnit.SECONDS.toMillis(120)
+    )
+
+    val matched = 
WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(60)
+
+    val eids = matched.map(_.getEid).toSet
+    assert(eids.contains(expired.getEid))
+    assert(matched.forall(e => e.getResult.nonEmpty || 
Option(e.getLogLocation).exists(_.nonEmpty)))
+  }
+
+  it should "match rows that are stale by last_update_time and have a 
log_location" in {
+    val expired = insertExecution(
+      name = "log-stale",
+      logLocation = "file:///tmp/log",
+      lastUpdateOffsetMillis = Some(TimeUnit.SECONDS.toMillis(120))
+    )
+    insertExecution(
+      name = "log-fresh",
+      logLocation = "file:///tmp/log-2",
+      lastUpdateOffsetMillis = Some(0L)
+    )
+
+    val matched = 
WorkflowExecutionsResource.getExpiredExecutionsWithResultOrLog(60)
+    assert(matched.map(_.getEid).toSet.contains(expired.getEid))
+  }
+
+  // ─── new: insertOperatorExecutions ────────────────────────────────────────
+
+  "insertOperatorExecutions" should "insert one OPERATOR_EXECUTIONS row" in {
+    val execution = insertExecution()
+    val uri = URI.create("vfs:///console-msg")
+
+    WorkflowExecutionsResource.insertOperatorExecutions(
+      execution.getEid.longValue(),
+      "op-A",
+      uri
+    )
+
+    val rows = getDSLContext
+      .selectFrom(OPERATOR_EXECUTIONS)
+      .where(OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+      .and(OPERATOR_EXECUTIONS.OPERATOR_ID.eq("op-A"))
+      .fetch()
+
+    assert(rows.size() == 1)
+    assert(rows.get(0).getConsoleMessagesUri == uri.toString)
+  }
+
+  // ─── new: updateRuntimeStatsUri ───────────────────────────────────────────
+
+  "updateRuntimeStatsUri" should "set the runtime_stats_uri on the matching 
execution" in {
+    val execution = insertExecution()
+    val uri = URI.create("vfs:///runtime-stats")
+
+    WorkflowExecutionsResource.updateRuntimeStatsUri(
+      testWorkflowWid.longValue(),
+      execution.getEid.longValue(),
+      uri
+    )
+
+    val refreshed = workflowExecutionsDao.fetchOneByEid(execution.getEid)
+    assert(refreshed.getRuntimeStatsUri == uri.toString)
+  }
+
+  it should "leave executions belonging to other workflows untouched" in {
+    val execution = insertExecution()
+    val uri = URI.create("vfs:///runtime-stats")
+
+    // wid that does not match the execution's WORKFLOW_VERSION row → no-op.
+    WorkflowExecutionsResource.updateRuntimeStatsUri(
+      (testWorkflowWid + 100000).longValue(),
+      execution.getEid.longValue(),
+      uri
+    )
+
+    val refreshed = workflowExecutionsDao.fetchOneByEid(execution.getEid)
+    assert(refreshed.getRuntimeStatsUri == null)
+  }
+
+  // ─── new: URI fetchers ────────────────────────────────────────────────────
+
+  "getResultUrisByExecutionId" should "return inserted URIs and filter out 
null/empty entries" in {
+    val execution = insertExecution()
+    val eid = ExecutionIdentity(execution.getEid.longValue())
+    val opA = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("opA"), "main"),
+      PortIdentity(),
+      input = false
+    )
+    val opB = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("opB"), "main"),
+      PortIdentity(),
+      input = false
+    )
+    val opC = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("opC"), "main"),
+      PortIdentity(),
+      input = false
+    )
+
+    WorkflowExecutionsResource.insertOperatorPortResultUri(eid, opA, 
URI.create("vfs:///A"))
+    WorkflowExecutionsResource.insertOperatorPortResultUri(eid, opB, 
URI.create("vfs:///B"))
+    // Empty-string URI row — the helper should drop it from the returned list.
+    getDSLContext
+      .insertInto(OPERATOR_PORT_EXECUTIONS)
+      .columns(
+        OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID,
+        OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID,
+        OPERATOR_PORT_EXECUTIONS.RESULT_URI
+      )
+      .values(execution.getEid, opC.serializeAsString, "")
+      .execute()
+
+    val uris = WorkflowExecutionsResource.getResultUrisByExecutionId(eid)
+    assert(uris.toSet == Set(URI.create("vfs:///A"), URI.create("vfs:///B")))
+  }
+
+  "getConsoleMessagesUriByExecutionId" should "return inserted URIs and filter 
empty entries" in {
+    val execution = insertExecution()
+    val eid = ExecutionIdentity(execution.getEid.longValue())
+
+    WorkflowExecutionsResource.insertOperatorExecutions(
+      execution.getEid.longValue(),
+      "op-A",
+      URI.create("vfs:///console-A")
+    )
+    WorkflowExecutionsResource.insertOperatorExecutions(
+      execution.getEid.longValue(),
+      "op-B",
+      URI.create("vfs:///console-B")
+    )
+    // Empty-URI row — must be filtered.
+    getDSLContext
+      .insertInto(OPERATOR_EXECUTIONS)
+      .columns(
+        OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID,
+        OPERATOR_EXECUTIONS.OPERATOR_ID,
+        OPERATOR_EXECUTIONS.CONSOLE_MESSAGES_URI
+      )
+      .values(execution.getEid, "op-C", "")
+      .execute()
+
+    val uris = 
WorkflowExecutionsResource.getConsoleMessagesUriByExecutionId(eid)
+    assert(uris.toSet == Set(URI.create("vfs:///console-A"), 
URI.create("vfs:///console-B")))
+  }
+
+  "getRuntimeStatsUriByExecutionId" should "return None when the stored URI is 
null or empty" in {
+    val noUri = insertExecution()
+    assert(
+      WorkflowExecutionsResource
+        
.getRuntimeStatsUriByExecutionId(ExecutionIdentity(noUri.getEid.longValue()))
+        .isEmpty
+    )
+
+    val emptyUri = insertExecution(runtimeStatsUri = "")
+    assert(
+      WorkflowExecutionsResource
+        
.getRuntimeStatsUriByExecutionId(ExecutionIdentity(emptyUri.getEid.longValue()))
+        .isEmpty
+    )
+  }
+
+  it should "return Some(URI) when the stored URI is non-empty" in {
+    val withUri = insertExecution(runtimeStatsUri = "vfs:///stats")
+    val result = WorkflowExecutionsResource.getRuntimeStatsUriByExecutionId(
+      ExecutionIdentity(withUri.getEid.longValue())
+    )
+    assert(result.contains(URI.create("vfs:///stats")))
+  }
+
+  // ─── new: deleteConsoleMessageAndExecutionResultUris ──────────────────────
+
+  "deleteConsoleMessageAndExecutionResultUris" should "purge both child tables 
for a given eid" in {
+    val execution = insertExecution()
+    val eid = ExecutionIdentity(execution.getEid.longValue())
+
+    val globalPortId = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("op-purge"), "main"),
+      PortIdentity(),
+      input = false
+    )
+    WorkflowExecutionsResource.insertOperatorPortResultUri(
+      eid,
+      globalPortId,
+      URI.create("vfs:///r")
+    )
+    WorkflowExecutionsResource.insertOperatorExecutions(
+      execution.getEid.longValue(),
+      "op-purge",
+      URI.create("vfs:///c")
+    )
+
+    WorkflowExecutionsResource.deleteConsoleMessageAndExecutionResultUris(eid)
+
+    val resultRows = getDSLContext
+      .fetchCount(
+        OPERATOR_PORT_EXECUTIONS,
+        OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid)
+      )
+    val consoleRows = getDSLContext
+      .fetchCount(
+        OPERATOR_EXECUTIONS,
+        OPERATOR_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid)
+      )
+    assert(resultRows == 0)
+    assert(consoleRows == 0)
+  }
+
+  // ─── new: removeAllExecutionFiles (DB delete branch) ──────────────────────
+
+  "removeAllExecutionFiles" should "delete the listed executions from 
WORKFLOW_EXECUTIONS" in {
+    val a = insertExecution()
+    val b = insertExecution()
+    // Distractor that should survive.
+    val survivor = insertExecution()
+
+    WorkflowExecutionsResource.removeAllExecutionFiles(Array(a.getEid, 
b.getEid))
+
+    val survivors = workflowExecutionsDao.findAll()
+    val survivorEids = 
survivors.toArray.map(_.asInstanceOf[WorkflowExecutions].getEid).toSet
+    assert(!survivorEids.contains(a.getEid))
+    assert(!survivorEids.contains(b.getEid))
+    assert(survivorEids.contains(survivor.getEid))
+  }
+
+  // ─── new: updateResultSize ────────────────────────────────────────────────
+
+  "updateResultSize" should "set RESULT_SIZE on the matching (eid, 
globalPortId) row" in {
+    val execution = insertExecution()
+    val eid = ExecutionIdentity(execution.getEid.longValue())
+    val globalPortId = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("op-size"), "main"),
+      PortIdentity(),
+      input = false
+    )
+    WorkflowExecutionsResource.insertOperatorPortResultUri(
+      eid,
+      globalPortId,
+      URI.create("vfs:///r")
+    )
+
+    WorkflowExecutionsResource.updateResultSize(eid, globalPortId, 4096L)
+
+    val row = getDSLContext
+      .selectFrom(OPERATOR_PORT_EXECUTIONS)
+      
.where(OPERATOR_PORT_EXECUTIONS.WORKFLOW_EXECUTION_ID.eq(execution.getEid))
+      
.and(OPERATOR_PORT_EXECUTIONS.GLOBAL_PORT_ID.eq(globalPortId.serializeAsString))
+      .fetchOne()
+    assert(row.getResultSize == 4096)
+  }
+
+  // ─── new: getResultUriByLogicalPortId ─────────────────────────────────────
+
+  "getResultUriByLogicalPortId" should "match by logical operator id, port id, 
and resource type" in {
+    val execution = insertExecution()
+    val eid = ExecutionIdentity(execution.getEid.longValue())
+    val wfId = WorkflowIdentity(testWorkflowWid.longValue())
+
+    // Build a real VFS result URI that decodeURI can parse.
+    val targetOpId = OperatorIdentity("target-op")
+    val targetPortId = PortIdentity()
+    val targetGlobalPort = GlobalPortIdentity(
+      PhysicalOpIdentity(targetOpId, "main"),
+      targetPortId,
+      input = false
+    )
+    val targetUri = VFSURIFactory.resultURI(
+      VFSURIFactory.createPortBaseURI(wfId, eid, targetGlobalPort)
+    )
+    WorkflowExecutionsResource.insertOperatorPortResultUri(eid, 
targetGlobalPort, targetUri)
+
+    // Distractor: same workflow, different op id.
+    val otherGlobalPort = GlobalPortIdentity(
+      PhysicalOpIdentity(OperatorIdentity("other-op"), "main"),
+      PortIdentity(),
+      input = false
+    )
+    val otherUri = VFSURIFactory.resultURI(
+      VFSURIFactory.createPortBaseURI(wfId, eid, otherGlobalPort)
+    )
+    WorkflowExecutionsResource.insertOperatorPortResultUri(eid, 
otherGlobalPort, otherUri)
+
+    val found =
+      WorkflowExecutionsResource.getResultUriByLogicalPortId(eid, targetOpId, 
targetPortId)
+    assert(found.contains(targetUri))
+
+    // Sanity-check: the decoded URI is RESULT-typed and matches the target 
ids.
+    val (_, _, gpOpt, resType) = VFSURIFactory.decodeURI(found.get)
+    assert(resType == VFSResourceType.RESULT)
+    assert(gpOpt.exists(gp => gp.opId.logicalOpId == targetOpId && gp.portId 
== targetPortId))
+  }
+
+  it should "return None when no URI matches the requested op/port" in {
+    val execution = insertExecution()
+    val eid = ExecutionIdentity(execution.getEid.longValue())
+    val found =
+      WorkflowExecutionsResource.getResultUriByLogicalPortId(
+        eid,
+        OperatorIdentity("nope"),
+        PortIdentity()
+      )
+    assert(found.isEmpty)
+  }
+
+  // ─── new: getNonDownloadableOperatorMap (private — via 
PrivateMethodTester) ─
+
+  "getNonDownloadableOperatorMap" should "flag operators reading 
non-downloadable datasets they don't own" in {
+    // Owner of the non-downloadable dataset is a *different* user than 
testUser.
+    val otherUser = new User
+    val otherUid = testUserId + 1
+    otherUser.setUid(otherUid)
+    otherUser.setName("dataset-owner")
+    otherUser.setEmail("[email protected]")
+    otherUser.setPassword("password")
+    userDao.insert(otherUser)
+
+    val dataset = new Dataset
+    dataset.setOwnerUid(otherUid)
+    dataset.setName("LockedDS")
+    dataset.setRepositoryName("repo-locked")
+    dataset.setIsPublic(false)
+    dataset.setIsDownloadable(false)
+    dataset.setDescription("")
+    dataset.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    datasetDao.insert(dataset)
+
+    // Workflow content: scan op A reading the locked dataset, then a 
downstream op B.
+    val content =
+      """{
+        |  "operators": [
+        |    {"operatorID": "scanA", "operatorProperties": {"fileName": 
"/[email protected]/LockedDS/v1/data.csv"}},
+        |    {"operatorID": "downstreamB", "operatorProperties": {}}
+        |  ],
+        |  "links": [
+        |    {"source": {"operatorID": "scanA"}, "target": {"operatorID": 
"downstreamB"}}
+        |  ]
+        |}""".stripMargin
+    testWorkflow.setContent(content)
+    workflowDao.update(testWorkflow)
+
+    val privateMethod =
+      PrivateMethod[Map[String, Set[(String, 
String)]]](Symbol("getNonDownloadableOperatorMap"))
+    val result = WorkflowExecutionsResource invokePrivate 
privateMethod(testWorkflowWid, testUser)
+
+    assert(result.contains("scanA"))
+    assert(result("scanA").contains(("[email protected]", "LockedDS")))
+    // BFS propagates the restriction to the downstream operator.
+    assert(result.contains("downstreamB"))
+  }
+
+  it should "return an empty map when the workflow content is unparseable" in {
+    testWorkflow.setContent("not-json")
+    workflowDao.update(testWorkflow)
+
+    val privateMethod =
+      PrivateMethod[Map[String, Set[(String, 
String)]]](Symbol("getNonDownloadableOperatorMap"))
+    val result = WorkflowExecutionsResource invokePrivate 
privateMethod(testWorkflowWid, testUser)
+    assert(result.isEmpty)
+  }
+
+  it should "return an empty map when the workflow has no operators 
referencing datasets" in {
+    val content =
+      """{"operators": [{"operatorID": "x", "operatorProperties": {}}], 
"links": []}"""
+    testWorkflow.setContent(content)
+    workflowDao.update(testWorkflow)
+
+    val privateMethod =
+      PrivateMethod[Map[String, Set[(String, 
String)]]](Symbol("getNonDownloadableOperatorMap"))
+    val result = WorkflowExecutionsResource invokePrivate 
privateMethod(testWorkflowWid, testUser)
+    assert(result.isEmpty)
+  }
+
+  it should "skip restriction when the current user is the dataset owner" in {
+    // The dataset is owned by testUser ([email protected]), and the operator 
points
+    // to /[email protected]/MyDS/v1/file.csv → no restriction even though
+    // is_downloadable=false.
+    val dataset = new Dataset
+    dataset.setOwnerUid(testUserId)
+    dataset.setName("MyDS")
+    dataset.setRepositoryName("repo-my")
+    dataset.setIsPublic(false)
+    dataset.setIsDownloadable(false)
+    dataset.setDescription("")
+    dataset.setCreationTime(new Timestamp(System.currentTimeMillis()))
+    datasetDao.insert(dataset)
+
+    val content =
+      """{
+        |  "operators": [
+        |    {"operatorID": "scan", "operatorProperties": {"fileName": 
"/[email protected]/MyDS/v1/data.csv"}}
+        |  ],
+        |  "links": []
+        |}""".stripMargin
+    testWorkflow.setContent(content)
+    workflowDao.update(testWorkflow)
+
+    val privateMethod =
+      PrivateMethod[Map[String, Set[(String, 
String)]]](Symbol("getNonDownloadableOperatorMap"))
+    val result = WorkflowExecutionsResource invokePrivate 
privateMethod(testWorkflowWid, testUser)
+    assert(result.isEmpty)
+  }
+
 }

Reply via email to