This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-state-materialization
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-state-materialization
by this push:
new 3c4dbb8ff8 test: cover multiple state rows in materialization
3c4dbb8ff8 is described below
commit 3c4dbb8ff8c0e019e0849722afaaea7e35e96025
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 23 01:16:36 2026 -0700
test: cover multiple state rows in materialization
---
.../core/storage/iceberg/test_iceberg_document.py | 44 ++++++++++++++++++++++
.../result/iceberg/IcebergDocumentSpec.scala | 38 +++++++++++++++++++
2 files changed, 82 insertions(+)
diff --git
a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
index 29e43f249c..f4d1c6b344 100644
--- a/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
+++ b/amber/src/main/python/core/storage/iceberg/test_iceberg_document.py
@@ -357,3 +357,47 @@ class TestIcebergDocument:
stored_rows = list(document.get())
assert len(stored_rows) == 1
assert deserialize_state(stored_rows[0]) == state
+
+ def test_multiple_states_materialize_as_rows_in_one_table(self):
+ operator_uuid = str(uuid.uuid4()).replace("-", "")
+ result_uri = VFSURIFactory.create_result_uri(
+ WorkflowIdentity(id=0),
+ ExecutionIdentity(id=0),
+ GlobalPortIdentity(
+ op_id=PhysicalOpIdentity(
+ logical_op_id=OperatorIdentity(
+ id=f"test_multiple_states_{operator_uuid}"
+ ),
+ layer_name="main",
+ ),
+ port_id=PortIdentity(id=0),
+ input=False,
+ ),
+ )
+ state_uri = state_uri_from_result_uri(result_uri)
+ DocumentFactory.create_document(state_uri, STATE_SCHEMA)
+ document, _ = DocumentFactory.open_document(state_uri)
+
+ states = [
+ {"loop_counter": 0, "i": 1, "payload": b"first"},
+ {
+ "loop_counter": 1,
+ "i": 2,
+ "payload": b"second",
+ "nested": {"values": [3, 4]},
+ },
+ ]
+
+ writer = document.writer(str(uuid.uuid4()))
+ writer.open()
+ for state in states:
+ writer.put_one(serialize_state(state))
+ writer.close()
+
+ stored_rows = list(document.get())
+ assert len(stored_rows) == len(states)
+ actual_states = sorted(
+ [deserialize_state(row) for row in stored_rows],
+ key=lambda state: state["loop_counter"],
+ )
+ assert actual_states == states
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 761efe6341..062d9d21cc 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -169,6 +169,44 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
assert(deserialized("nested").asInstanceOf[Map[String, Any]]("values") ==
List(1L, 2L, 3L))
}
+ it should "materialize multiple states as rows in one state table" in {
+ val stateUri = State.stateUriFromResultUri(uri)
+ DocumentFactory.createDocument(stateUri, State.schema)
+ val stateDocument =
+
DocumentFactory.openDocument(stateUri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ val states: List[State] = List(
+ Map("loop_counter" -> 0, "i" -> 1, "payload" -> Array[Byte](1, 2, 3)),
+ Map(
+ "loop_counter" -> 1,
+ "i" -> 2,
+ "payload" -> Array[Byte](4, 5, 6),
+ "nested" -> Map("values" -> List(3, 4))
+ )
+ )
+
+ val writer = stateDocument.writer(UUID.randomUUID().toString)
+ writer.open()
+ states.foreach(state => writer.putOne(State.serialize(state)))
+ writer.close()
+
+ val deserializedStates =
+
stateDocument.get().toList.map(State.deserialize).sortBy(_("loop_counter").asInstanceOf[Long])
+ assert(deserializedStates.length == states.length)
+ deserializedStates.zip(states).foreach {
+ case (actual, expected) =>
+ assert(actual("loop_counter") ==
expected("loop_counter").asInstanceOf[Int].toLong)
+ assert(actual("i") == expected("i").asInstanceOf[Int].toLong)
+ assert(
+ actual("payload")
+ .asInstanceOf[Array[Byte]]
+ .sameElements(expected("payload").asInstanceOf[Array[Byte]])
+ )
+ }
+ assert(
+ deserializedStates(1)("nested").asInstanceOf[Map[String, Any]]("values")
== List(3L, 4L)
+ )
+ }
+
/** Returns a dynamic proxy for `realTable` that increments `counter` on
every `refresh()` call. */
private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger):
Table =
Proxy