This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-materialization 
by this push:
     new 4223cf5640 refactor(storage): make port URIs symmetric over a base URI
4223cf5640 is described below

commit 4223cf56408e51283280fdc61bdb7008f5f6f2e7
Author: Xinyuan Lin <[email protected]>
AuthorDate: Wed May 6 17:56:10 2026 -0700

    refactor(storage): make port URIs symmetric over a base URI
    
    Drop the result-as-primary asymmetry in VFSURIFactory: ports now build
    a single base URI, with result and state URIs as equal first-class
    derivatives via resultURI(base) / stateURI(base). Removes the
    substring-replace `siblingStateURI` helper and the asymmetric
    createResultURI / createStateURI pair.
---
 .../core/architecture/packaging/output_manager.py  |  7 +++--
 amber/src/main/python/core/models/state.py         |  4 ---
 .../input_port_materialization_reader_runnable.py  |  5 ++--
 .../main/python/core/storage/vfs_uri_factory.py    | 16 +++++++++--
 .../messaginglayer/OutputManager.scala             |  8 +++---
 .../scheduling/CostBasedScheduleGenerator.scala    |  6 ++--
 .../ExpansionGreedyScheduleGenerator.scala         |  4 +--
 .../scheduling/RegionExecutionCoordinator.scala    | 15 +++++-----
 .../InputPortMaterializationReaderThread.scala     |  6 ++--
 .../core/storage/iceberg/test_iceberg_document.py  | 32 ++++++++++++----------
 .../org/apache/texera/amber/core/state/State.scala |  4 ---
 .../texera/amber/core/storage/VFSURIFactory.scala  | 21 +++++++++-----
 .../amber/core/storage/VFSURIFactorySpec.scala     | 23 ++++++++++------
 .../result/iceberg/IcebergDocumentSpec.scala       |  8 ++++--
 .../result/iceberg/IcebergTableStatsSpec.scala     | 22 ++++++++-------
 15 files changed, 105 insertions(+), 76 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index cea76904fe..8521b98ef4 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -45,6 +45,7 @@ from core.models import Tuple, Schema, StateFrame
 from core.models.payload import DataPayload, DataFrame
 from core.models.state import State
 from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
 from core.storage.runnables.port_storage_writer import (
     PortStorageWriter,
     PortStorageWriterElement,
@@ -131,7 +132,9 @@ class OutputManager:
         to storage in batch, and open a long-lived buffered writer for
         state materialization on the same port.
         """
-        document, _ = DocumentFactory.open_document(storage_uri)
+        document, _ = DocumentFactory.open_document(
+            VFSURIFactory.result_uri(storage_uri)
+        )
         buffered_item_writer = 
document.writer(str(get_worker_index(self.worker_id)))
         writer_queue = Queue()
         port_storage_writer = PortStorageWriter(
@@ -150,7 +153,7 @@ class OutputManager:
         )
 
         state_document, _ = DocumentFactory.open_document(
-            State.uri_from_result_uri(storage_uri)
+            VFSURIFactory.state_uri(storage_uri)
         )
         state_buffered_item_writer = state_document.writer(
             str(get_worker_index(self.worker_id))
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 3ce610bbee..003aaa212a 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -41,10 +41,6 @@ class State(dict):
     def from_tuple(cls, row: Tuple) -> "State":
         return cls.from_json(row[cls.CONTENT])
 
-    @staticmethod
-    def uri_from_result_uri(result_uri: str) -> str:
-        return result_uri.replace("/result", "/state")
-
 
 _TYPE_MARKER = "__texera_type__"
 _PAYLOAD_MARKER = "payload"
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index bdb678aac2..692bff6555 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -37,6 +37,7 @@ from core.architecture.sendsemantics.round_robin_partitioner 
import (
 from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, 
StateFrame
 from core.models.internal_queue import DataElement, ECMElement
 from core.storage.document_factory import DocumentFactory
+from core.storage.vfs_uri_factory import VFSURIFactory
 from core.util import Stoppable, get_one_of
 from core.util.runnable.runnable import Runnable
 from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
@@ -143,12 +144,12 @@ class InputPortMaterializationReaderRunnable(Runnable, 
Stoppable):
         """
         try:
             self.materialization, self.tuple_schema = 
DocumentFactory.open_document(
-                self.uri
+                VFSURIFactory.result_uri(self.uri)
             )
             self.emit_ecm("StartChannel", 
EmbeddedControlMessageType.NO_ALIGNMENT)
 
             state_document, _ = DocumentFactory.open_document(
-                State.uri_from_result_uri(self.uri)
+                VFSURIFactory.state_uri(self.uri)
             )
             for state_row in state_document.get():
                 self.emit_payload(StateFrame(State.from_tuple(state_row)))
diff --git a/amber/src/main/python/core/storage/vfs_uri_factory.py 
b/amber/src/main/python/core/storage/vfs_uri_factory.py
index 0e23e60705..883450abf2 100644
--- a/amber/src/main/python/core/storage/vfs_uri_factory.py
+++ b/amber/src/main/python/core/storage/vfs_uri_factory.py
@@ -89,12 +89,22 @@ class VFSURIFactory:
         )
 
     @staticmethod
-    def create_result_uri(workflow_id, execution_id, global_port_id) -> str:
-        """Creates a URI pointing to a result storage."""
-        base_uri = (
+    def create_port_base_uri(workflow_id, execution_id, global_port_id) -> str:
+        """Base URI for a port. Result and state URIs derive from it via
+        `result_uri` / `state_uri`.
+        """
+        return (
             f"{VFSURIFactory.VFS_FILE_URI_SCHEME}:///wid/{workflow_id.id}"
             f"/eid/{execution_id.id}/globalportid/"
             f"{serialize_global_port_identity(global_port_id)}"
         )
 
+    @staticmethod
+    def result_uri(base_uri: str) -> str:
+        """The result-resource URI under a port base URI."""
         return f"{base_uri}/{VFSResourceType.RESULT.value}"
+
+    @staticmethod
+    def state_uri(base_uri: str) -> str:
+        """The state-resource URI under a port base URI."""
+        return f"{base_uri}/{VFSResourceType.STATE.value}"
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index d76a76be33..3b233d8cf7 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -20,7 +20,7 @@
 package org.apache.texera.amber.engine.architecture.messaginglayer
 
 import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.model.BufferedItemWriter
 import org.apache.texera.amber.core.tuple._
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
@@ -295,9 +295,9 @@ class OutputManager(
     ports.head._1
   }
 
-  private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: 
URI): Unit = {
+  private def setupOutputStorageWriterThread(portId: PortIdentity, 
portBaseURI: URI): Unit = {
     val bufferedItemWriter = DocumentFactory
-      .openDocument(storageUri)
+      .openDocument(VFSURIFactory.resultURI(portBaseURI))
       ._1
       .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
       .asInstanceOf[BufferedItemWriter[Tuple]]
@@ -308,7 +308,7 @@ class OutputManager(
     // The state document is provisioned alongside the result document
     // by RegionExecutionCoordinator, so it is always present.
     val stateWriter = DocumentFactory
-      .openDocument(State.uriFromResultUri(storageUri))
+      .openDocument(VFSURIFactory.stateURI(portBaseURI))
       ._1
       .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
       .asInstanceOf[BufferedItemWriter[Tuple]]
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
index 401ccddc0a..d2c3175368 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/CostBasedScheduleGenerator.scala
@@ -20,7 +20,7 @@
 package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
PhysicalOpIdentity}
 import org.apache.texera.amber.core.workflow._
 import 
org.apache.texera.amber.engine.architecture.scheduling.SchedulingUtils.replaceVertex
@@ -174,12 +174,12 @@ class CostBasedScheduleGenerator(
         // Allocate an URI for each of these output ports
         val outputPortConfigs: Map[GlobalPortIdentity, OutputPortConfig] =
           outputPortIdsNeedingStorage.map { gpid =>
-            val outputWriterURI = createResultURI(
+            val portBaseURI = createPortBaseURI(
               workflowId = workflowContext.workflowId,
               executionId = workflowContext.executionId,
               globalPortId = gpid
             )
-            gpid -> OutputPortConfig(outputWriterURI)
+            gpid -> OutputPortConfig(portBaseURI)
           }.toMap
 
         val resourceConfig = ResourceConfig(portConfigs = outputPortConfigs)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
index 4bb8933896..304e1496f8 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/ExpansionGreedyScheduleGenerator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
 
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.texera.amber.core.WorkflowRuntimeException
-import org.apache.texera.amber.core.storage.VFSURIFactory.createResultURI
+import org.apache.texera.amber.core.storage.VFSURIFactory.createPortBaseURI
 import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
 import org.apache.texera.amber.core.workflow.{
   GlobalPortIdentity,
@@ -331,7 +331,7 @@ class ExpansionGreedyScheduleGenerator(
 
   private def getStorageURIFromGlobalOutputPortId(outputPortId: 
GlobalPortIdentity) = {
     assert(!outputPortId.input)
-    createResultURI(
+    createPortBaseURI(
       workflowId = workflowContext.workflowId,
       executionId = workflowContext.executionId,
       globalPortId = outputPortId
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 58fdf9f242..9262fcee18 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -22,7 +22,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
 import org.apache.pekko.pattern.gracefulStop
 import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
 import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
 import org.apache.texera.amber.core.workflow.{GlobalPortIdentity, 
PhysicalLink, PhysicalOp}
@@ -569,20 +569,21 @@ class RegionExecutionCoordinator(
   ): Unit = {
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
-        val storageUriToAdd = portConfig.storageURI
-        val stateUriToAdd = State.uriFromResultUri(storageUriToAdd)
-        val (_, eid, _, _) = decodeURI(storageUriToAdd)
+        val portBaseURI = portConfig.storageURI
+        val resultURI = VFSURIFactory.resultURI(portBaseURI)
+        val stateURI = VFSURIFactory.stateURI(portBaseURI)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(storageUriToAdd, schema)
-        DocumentFactory.createDocument(stateUriToAdd, State.schema)
+        DocumentFactory.createDocument(resultURI, schema)
+        DocumentFactory.createDocument(stateURI, State.schema)
         if (!isRestart) {
+          val (_, eid, _, _) = decodeURI(resultURI)
           WorkflowExecutionsResource.insertOperatorPortResultUri(
             eid = eid,
             globalPortId = outputPortId,
-            uri = storageUriToAdd
+            uri = resultURI
           )
         }
     }
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index e06453b1a3..a7d259c37e 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -22,7 +22,7 @@ package 
org.apache.texera.amber.engine.architecture.worker.managers
 import io.grpc.MethodDescriptor
 import org.apache.texera.amber.config.ApplicationConfig
 import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.storage.DocumentFactory
+import org.apache.texera.amber.core.storage.{DocumentFactory, VFSURIFactory}
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.{
@@ -99,7 +99,7 @@ class InputPortMaterializationReaderThread(
     try {
       val stateDocument =
         DocumentFactory
-          .openDocument(State.uriFromResultUri(uri))
+          .openDocument(VFSURIFactory.stateURI(uri))
           ._1
           .asInstanceOf[VirtualDocument[Tuple]]
       val stateReadIterator = stateDocument.get()
@@ -111,7 +111,7 @@ class InputPortMaterializationReaderThread(
       }
 
       val materialization: VirtualDocument[Tuple] = DocumentFactory
-        .openDocument(uri)
+        .openDocument(VFSURIFactory.resultURI(uri))
         ._1
         .asInstanceOf[VirtualDocument[Tuple]]
       val storageReadIterator = materialization.get()
diff --git 
a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
index 44e62bbb8a..381f8e5ff6 100644
--- a/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/test/python/core/storage/iceberg/test_iceberg_document.py
@@ -83,17 +83,21 @@ class TestIcebergDocument:
         with a random operator id
         """
         operator_uuid = str(uuid.uuid4()).replace("-", "")
-        uri = VFSURIFactory.create_result_uri(
-            WorkflowIdentity(id=0),
-            ExecutionIdentity(id=0),
-            GlobalPortIdentity(
-                op_id=PhysicalOpIdentity(
-                    
logical_op_id=OperatorIdentity(id=f"test_table_{operator_uuid}"),
-                    layer_name="main",
+        uri = VFSURIFactory.result_uri(
+            VFSURIFactory.create_port_base_uri(
+                WorkflowIdentity(id=0),
+                ExecutionIdentity(id=0),
+                GlobalPortIdentity(
+                    op_id=PhysicalOpIdentity(
+                        logical_op_id=OperatorIdentity(
+                            id=f"test_table_{operator_uuid}"
+                        ),
+                        layer_name="main",
+                    ),
+                    port_id=PortIdentity(id=0),
+                    input=False,
                 ),
-                port_id=PortIdentity(id=0),
-                input=False,
-            ),
+            )
         )
         DocumentFactory.create_document(uri, amber_schema)
         document, _ = DocumentFactory.open_document(uri)
@@ -327,7 +331,7 @@ class TestIcebergDocument:
 
     def test_state_materialization_round_trip(self):
         operator_uuid = str(uuid.uuid4()).replace("-", "")
-        result_uri = VFSURIFactory.create_result_uri(
+        base_uri = VFSURIFactory.create_port_base_uri(
             WorkflowIdentity(id=0),
             ExecutionIdentity(id=0),
             GlobalPortIdentity(
@@ -339,7 +343,7 @@ class TestIcebergDocument:
                 input=False,
             ),
         )
-        state_uri = State.uri_from_result_uri(result_uri)
+        state_uri = VFSURIFactory.state_uri(base_uri)
         DocumentFactory.create_document(state_uri, State.SCHEMA)
         document, _ = DocumentFactory.open_document(state_uri)
 
@@ -363,7 +367,7 @@ class TestIcebergDocument:
 
     def test_multiple_states_materialize_as_rows_in_one_table(self):
         operator_uuid = str(uuid.uuid4()).replace("-", "")
-        result_uri = VFSURIFactory.create_result_uri(
+        base_uri = VFSURIFactory.create_port_base_uri(
             WorkflowIdentity(id=0),
             ExecutionIdentity(id=0),
             GlobalPortIdentity(
@@ -377,7 +381,7 @@ class TestIcebergDocument:
                 input=False,
             ),
         )
-        state_uri = State.uri_from_result_uri(result_uri)
+        state_uri = VFSURIFactory.state_uri(base_uri)
         DocumentFactory.create_document(state_uri, State.SCHEMA)
         document, _ = DocumentFactory.open_document(state_uri)
 
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
index 532f355c17..ba146f1d57 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.JsonNode
 import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
 import org.apache.texera.amber.util.JSONUtils.objectMapper
 
-import java.net.URI
 import java.util.Base64
 import scala.jdk.CollectionConverters.IteratorHasAsScala
 
@@ -58,9 +57,6 @@ object State {
 
   def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))
 
-  def uriFromResultUri(resultUri: URI): URI =
-    new URI(resultUri.toString.replace("/result", "/state"))
-
   private def toJsonValue(value: Any): Any =
     value match {
       case null => null
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
index e687b28a29..291c31896b 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala
@@ -84,18 +84,25 @@ object VFSURIFactory {
   }
 
   /**
-    * Create a URI pointing to a result storage
+    * Create the base URI for a port. Result and state URIs are derived
+    * from this base via `resultURI` / `stateURI`.
     */
-  def createResultURI(
+  def createPortBaseURI(
       workflowId: WorkflowIdentity,
       executionId: ExecutionIdentity,
       globalPortId: GlobalPortIdentity
-  ): URI = {
-    val baseUri =
-      
s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}/globalportid/${globalPortId.serializeAsString}"
+  ): URI =
+    new URI(
+      s"$VFS_FILE_URI_SCHEME:///wid/${workflowId.id}/eid/${executionId.id}" +
+        s"/globalportid/${globalPortId.serializeAsString}"
+    )
 
-    new URI(s"$baseUri/${VFSResourceType.RESULT.toString.toLowerCase}")
-  }
+  def resultURI(baseURI: URI): URI = appendResource(baseURI, 
VFSResourceType.RESULT)
+
+  def stateURI(baseURI: URI): URI = appendResource(baseURI, 
VFSResourceType.STATE)
+
+  private def appendResource(baseURI: URI, resourceType: 
VFSResourceType.Value): URI =
+    new URI(s"$baseURI/${resourceType.toString.toLowerCase}")
 
   /**
     * Create a URI pointing to runtime statistics
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
index 6fbe35873a..0b8ae4a19c 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/core/storage/VFSURIFactorySpec.scala
@@ -42,23 +42,30 @@ class VFSURIFactorySpec extends AnyFlatSpec {
       input = true
     )
 
-  "VFSURIFactory.createResultURI" should "include workflow, execution, port, 
and the result resource type" in {
-    val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
-    assert(uri.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
-    val path = uri.getPath
+  "VFSURIFactory.createPortBaseURI" should "include workflow, execution, and 
port segments without a resource type" in {
+    val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId, 
portId)
+    assert(baseURI.getScheme == VFSURIFactory.VFS_FILE_URI_SCHEME)
+    val path = baseURI.getPath
     assert(path.contains("/wid/7"))
     assert(path.contains("/eid/11"))
     assert(path.contains("/globalportid/"))
-    assert(path.endsWith("/result"))
+    assert(!path.endsWith("/result"))
+    assert(!path.endsWith("/state"))
   }
 
-  it should "round-trip through decodeURI" in {
-    val uri = VFSURIFactory.createResultURI(workflowId, executionId, portId)
-    val (wid, eid, globalPortIdOpt, resourceType) = 
VFSURIFactory.decodeURI(uri)
+  "VFSURIFactory.resultURI / stateURI" should "append the resource segment and 
round-trip through decodeURI" in {
+    val baseURI = VFSURIFactory.createPortBaseURI(workflowId, executionId, 
portId)
+    val resultURI = VFSURIFactory.resultURI(baseURI)
+    val stateURI = VFSURIFactory.stateURI(baseURI)
+    assert(resultURI.getPath.endsWith("/result"))
+    assert(stateURI.getPath.endsWith("/state"))
+
+    val (wid, eid, globalPortIdOpt, resourceType) = 
VFSURIFactory.decodeURI(resultURI)
     assert(wid == workflowId)
     assert(eid == executionId)
     assert(globalPortIdOpt.contains(portId))
     assert(resourceType == VFSResourceType.RESULT)
+    assert(VFSURIFactory.decodeURI(stateURI)._4 == VFSResourceType.STATE)
   }
 
   "VFSURIFactory.createRuntimeStatisticsURI" should "produce a 
runtimeStatistics URI without an opid segment" in {
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 7f1d8573c2..b865fff94d 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -52,6 +52,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
   var deserde: (IcebergSchema, Record) => Tuple = _
   var catalog: Catalog = _
   val tableNamespace = "test_namespace"
+  var baseURI: URI = _
   var uri: URI = _
 
   override def beforeAll(): Unit = {
@@ -80,7 +81,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
 
   override def beforeEach(): Unit = {
     // Generate a unique table name for each test
-    uri = VFSURIFactory.createResultURI(
+    baseURI = VFSURIFactory.createPortBaseURI(
       WorkflowIdentity(0),
       ExecutionIdentity(0),
       GlobalPortIdentity(
@@ -92,6 +93,7 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] 
with BeforeAndAfter
         PortIdentity()
       )
     )
+    uri = VFSURIFactory.resultURI(baseURI)
     DocumentFactory.createDocument(uri, amberSchema)
     super.beforeEach()
   }
@@ -143,7 +145,7 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
   }
 
   it should "round trip materialized state documents" in {
-    val stateUri = State.uriFromResultUri(uri)
+    val stateUri = VFSURIFactory.stateURI(baseURI)
     DocumentFactory.createDocument(stateUri, State.schema)
     val stateDocument =
       
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
@@ -172,7 +174,7 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
   }
 
   it should "materialize multiple states as rows in one state table" in {
-    val stateUri = State.uriFromResultUri(uri)
+    val stateUri = VFSURIFactory.stateURI(baseURI)
     DocumentFactory.createDocument(stateUri, State.schema)
     val stateDocument =
       
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
index 175ebc2c01..b7611f6f77 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergTableStatsSpec.scala
@@ -50,16 +50,18 @@ class IcebergTableStatsSpec extends AnyFlatSpec with 
BeforeAndAfterAll with Suit
   var deserde: (IcebergSchema, Record) => Tuple = _
   var catalog: Catalog = _
   val tableNamespace = "test_namespace"
-  var uri: URI = VFSURIFactory.createResultURI(
-    WorkflowIdentity(0),
-    ExecutionIdentity(0),
-    GlobalPortIdentity(
-      PhysicalOpIdentity(
-        logicalOpId =
-          
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
-        layerName = "main"
-      ),
-      PortIdentity()
+  var uri: URI = VFSURIFactory.resultURI(
+    VFSURIFactory.createPortBaseURI(
+      WorkflowIdentity(0),
+      ExecutionIdentity(0),
+      GlobalPortIdentity(
+        PhysicalOpIdentity(
+          logicalOpId =
+            
OperatorIdentity(s"test_table_${UUID.randomUUID().toString.replace("-", "")}"),
+          layerName = "main"
+        ),
+        PortIdentity()
+      )
     )
   )
 

Reply via email to