This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-materialization 
by this push:
     new 3c4dbb8ff8 test: cover multiple state rows in materialization
3c4dbb8ff8 is described below

commit 3c4dbb8ff8c0e019e0849722afaaea7e35e96025
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 23 01:16:36 2026 -0700

    test: cover multiple state rows in materialization
---
 .../core/storage/iceberg/test_iceberg_document.py  | 44 ++++++++++++++++++++++
 .../result/iceberg/IcebergDocumentSpec.scala       | 38 +++++++++++++++++++
 2 files changed, 82 insertions(+)

diff --git 
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py 
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 29e43f249c..f4d1c6b344 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -357,3 +357,47 @@ class TestIcebergDocument:
         stored_rows = list(document.get())
         assert len(stored_rows) == 1
         assert deserialize_state(stored_rows[0]) == state
+
+    def test_multiple_states_materialize_as_rows_in_one_table(self):
+        operator_uuid = str(uuid.uuid4()).replace("-", "")
+        result_uri = VFSURIFactory.create_result_uri(
+            WorkflowIdentity(id=0),
+            ExecutionIdentity(id=0),
+            GlobalPortIdentity(
+                op_id=PhysicalOpIdentity(
+                    logical_op_id=OperatorIdentity(
+                        id=f"test_multiple_states_{operator_uuid}"
+                    ),
+                    layer_name="main",
+                ),
+                port_id=PortIdentity(id=0),
+                input=False,
+            ),
+        )
+        state_uri = state_uri_from_result_uri(result_uri)
+        DocumentFactory.create_document(state_uri, STATE_SCHEMA)
+        document, _ = DocumentFactory.open_document(state_uri)
+
+        states = [
+            {"loop_counter": 0, "i": 1, "payload": b"first"},
+            {
+                "loop_counter": 1,
+                "i": 2,
+                "payload": b"second",
+                "nested": {"values": [3, 4]},
+            },
+        ]
+
+        writer = document.writer(str(uuid.uuid4()))
+        writer.open()
+        for state in states:
+            writer.put_one(serialize_state(state))
+        writer.close()
+
+        stored_rows = list(document.get())
+        assert len(stored_rows) == len(states)
+        actual_states = sorted(
+            [deserialize_state(row) for row in stored_rows],
+            key=lambda state: state["loop_counter"],
+        )
+        assert actual_states == states
diff --git 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 761efe6341..062d9d21cc 100644
--- 
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++ 
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -169,6 +169,44 @@ class IcebergDocumentSpec extends 
VirtualDocumentSpec[Tuple] with BeforeAndAfter
     assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") == 
List(1L, 2L, 3L))
   }
 
+  it should "materialize multiple states as rows in one state table" in {
+    val stateUri = State.stateUriFromResultUri(uri)
+    DocumentFactory.createDocument(stateUri, State.schema)
+    val stateDocument =
+      
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+    val states: List[State] = List(
+      Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2, 3)),
+      Map(
+        "loop_counter" -> 1,
+        "i" -> 2,
+        "payload" -> Array[Byte](4, 5, 6),
+        "nested" -> Map("values" -> List(3, 4))
+      )
+    )
+
+    val writer = stateDocument.writer(UUID.randomUUID().toString)
+    writer.open()
+    states.foreach(state => writer.putOne(State.serialize(state)))
+    writer.close()
+
+    val deserializedStates =
+      
stateDocument.get().toList.map(State.deserialize).sortBy(_("loop_counter").asInstanceOf[Long])
+    assert(deserializedStates.length == states.length)
+    deserializedStates.zip(states).foreach {
+      case (actual, expected) =>
+        assert(actual("loop_counter") == 
expected("loop_counter").asInstanceOf[Int].toLong)
+        assert(actual("i") == expected("i").asInstanceOf[Int].toLong)
+        assert(
+          actual("payload")
+            .asInstanceOf[Array[Byte]]
+            .sameElements(expected("payload").asInstanceOf[Array[Byte]])
+        )
+    }
+    assert(
+      deserializedStates(1)("nested").asInstanceOf[Map[String, Any]]("values") 
== List(3L, 4L)
+    )
+  }
+
   /** Returns a dynamic proxy for `realTable` that increments `counter` on 
every `refresh()` call. */
   private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger): 
Table =
     Proxy

Reply via email to