This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-state-only
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-state-only by this 
push:
     new d92ed51b1c fix fmt
d92ed51b1c is described below

commit d92ed51b1cdb65effe962e65f6cb1138a97a383f
Author: Xinyuan Lin <[email protected]>
AuthorDate: Thu Apr 30 18:01:53 2026 -0700

    fix fmt
---
 amber/src/main/python/core/models/state.py         | 28 ++++++++++++----------
 .../main/python/core/runnables/network_receiver.py | 15 +++---------
 .../main/python/core/runnables/network_sender.py   | 14 +++--------
 3 files changed, 21 insertions(+), 36 deletions(-)

diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 8603c2124f..003aaa212a 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -24,25 +24,27 @@ from .tuple import Tuple
 
 
 class State(dict):
-    pass
+    CONTENT = "content"
+    SCHEMA = Schema(raw_schema={CONTENT: "STRING"})
 
-STATE_CONTENT = "content"
-_TYPE_MARKER = "__texera_type__"
-_PAYLOAD_MARKER = "payload"
-_BYTES_TYPE = "bytes"
+    def to_json(self) -> str:
+        return json.dumps(_to_json_value(self), separators=(",", ":"))
 
-STATE_SCHEMA = Schema(raw_schema={STATE_CONTENT: "STRING"})
+    def to_tuple(self) -> Tuple:
+        return Tuple({State.CONTENT: self.to_json()}, schema=State.SCHEMA)
 
+    @classmethod
+    def from_json(cls, payload: str) -> "State":
+        return cls(_from_json_value(json.loads(payload)))
 
-def serialize_state(state: State) -> Tuple:
-    return Tuple(
-        {STATE_CONTENT: json.dumps(_to_json_value(state), separators=(",", 
":"))},
-        schema=STATE_SCHEMA,
-    )
+    @classmethod
+    def from_tuple(cls, row: Tuple) -> "State":
+        return cls.from_json(row[cls.CONTENT])
 
 
-def deserialize_state(row: Tuple) -> State:
-    return State(_from_json_value(json.loads(row[STATE_CONTENT])))
+_TYPE_MARKER = "__texera_type__"
+_PAYLOAD_MARKER = "payload"
+_BYTES_TYPE = "bytes"
 
 
 def _to_json_value(value: Any) -> Any:
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index 8cd8a0d537..739cf0788e 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -32,7 +32,7 @@ from 
core.architecture.handlers.actorcommand.credit_update_handler import (
 )
 from core.models import (
     DataFrame,
-    Tuple,
+    State,
     StateFrame,
 )
 from core.models.internal_queue import (
@@ -42,7 +42,6 @@ from core.models.internal_queue import (
     ECMElement,
 )
 from core.proxy import ProxyServer
-from core.models.state import STATE_SCHEMA, deserialize_state
 from core.util import Stoppable, get_one_of
 from core.util.runnable.runnable import Runnable
 from proto.org.apache.texera.amber.engine.architecture.rpc import 
EmbeddedControlMessage
@@ -146,20 +145,12 @@ class NetworkReceiver(Runnable, Stoppable):
         
self._proxy_server.register_actor_message_handler(actor_message_handler)
 
     @staticmethod
-    def _deserialize_state_payload(table: Table) -> dict:
+    def _deserialize_state_payload(table: Table) -> State:
         # Each network State message carries exactly one serialized state row.
         # Multiple states are sent as multiple State messages, not as multiple
         # rows inside one network payload.
         assert len(table) == 1
-        return deserialize_state(
-            Tuple(
-                {
-                    name: table[name][0].as_py()
-                    for name in STATE_SCHEMA.get_attr_names()
-                },
-                schema=STATE_SCHEMA,
-            )
-        )
+        return State.from_json(table[State.CONTENT][0].as_py())
 
     def register_shutdown(self, shutdown: callable) -> None:
         self._proxy_server.register(
diff --git a/amber/src/main/python/core/runnables/network_sender.py 
b/amber/src/main/python/core/runnables/network_sender.py
index 52d799d6f1..d8e3889ac1 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -20,18 +20,13 @@ from loguru import logger
 from overrides import overrides
 from typing import Optional
 
-from core.models import DataPayload, InternalQueue, DataFrame, StateFrame
+from core.models import DataPayload, InternalQueue, DataFrame, State, 
StateFrame
 from core.models.internal_queue import (
     InternalQueueElement,
     DataElement,
     DCMElement,
     ECMElement,
 )
-from core.models.state import (
-    STATE_CONTENT,
-    STATE_SCHEMA,
-    serialize_state,
-)
 from core.proxy import ProxyClient
 from core.util import StoppableQueueBlockingRunnable
 from proto.org.apache.texera.amber.core import ChannelIdentity
@@ -104,12 +99,9 @@ class NetworkSender(StoppableQueueBlockingRunnable):
             self._proxy_client.send_data(bytes(data_header), 
data_payload.frame)
         elif isinstance(data_payload, StateFrame):
             data_header = PythonDataHeader(tag=to, payload_type="State")
-            serialized_state = serialize_state(data_payload.frame)
             table = pa.Table.from_pydict(
-                {
-                    STATE_CONTENT: [serialized_state[STATE_CONTENT]],
-                },
-                schema=STATE_SCHEMA.as_arrow_schema(),
+                {State.CONTENT: [data_payload.frame.to_json()]},
+                schema=State.SCHEMA.as_arrow_schema(),
             )
             self._proxy_client.send_data(bytes(data_header), table)
         else:

Reply via email to