This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
     new af0c0ea9f0 fix fmt
af0c0ea9f0 is described below

commit af0c0ea9f0110646e85db39baa1c8d7c884556a8
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Apr 18 01:44:21 2026 -0700

    fix fmt
---
 .../core/architecture/packaging/output_manager.py  |   6 +-
 amber/src/main/python/core/models/state.py         |  86 ++++++++++++++
 amber/src/main/python/core/runnables/main_loop.py  |   8 +-
 .../main/python/core/runnables/network_receiver.py |   2 +-
 .../main/python/core/runnables/network_sender.py   |   4 +-
 .../input_port_materialization_reader_runnable.py  |   5 +-
 .../python/core/storage/state_materialization.py   | 105 -----------------
 .../messaginglayer/OutputManager.scala             |   7 +-
 .../pythonworker/PythonProxyClient.scala           |   4 +-
 .../pythonworker/PythonProxyServer.scala           |   4 +-
 .../scheduling/RegionExecutionCoordinator.scala    |   8 +-
 .../InputPortMaterializationReaderThread.scala     |   6 +-
 .../amber/core/state/StateMaterialization.scala    | 130 ---------------------
 .../apache/texera/amber/core/state/package.scala   | 104 +++++++++++++++++
 14 files changed, 215 insertions(+), 264 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/packaging/output_manager.py 
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index e3e500200b..eae79fb607 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -44,13 +44,13 @@ from 
core.architecture.sendsemantics.round_robin_partitioner import (
 )
 from core.models import Tuple, Schema, StateFrame
 from core.models.payload import DataPayload, DataFrame
-from core.models.state import State
-from core.storage.document_factory import DocumentFactory
-from core.storage.state_materialization import (
+from core.models.state import (
+    State,
     STATE_SCHEMA,
     serialize_state,
     state_uri_from_result_uri,
 )
+from core.storage.document_factory import DocumentFactory
 from core.storage.runnables.port_storage_writer import (
     PortStorageWriter,
     PortStorageWriterElement,
diff --git a/amber/src/main/python/core/models/state.py 
b/amber/src/main/python/core/models/state.py
index 1b8cb29888..7ff3daf2f4 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -15,6 +15,92 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import base64
+import json
+import pickle
 from typing import Any, Dict, TypeAlias
 
+from . import Schema, Tuple
+from .schema import AttributeType
+
 State: TypeAlias = Dict[str, Any]
+
+SERIALIZED_STATE_CONTENT = "serialized_state_content"
+LOOP_COUNTER = "loop_counter"
+_TYPE_MARKER = "__texera_type__"
+_PAYLOAD_MARKER = "payload"
+_BYTES_TYPE = "bytes"
+_PYTHON_PICKLE_TYPE = "python_pickle"
+
+STATE_SCHEMA = Schema()
+STATE_SCHEMA.add(SERIALIZED_STATE_CONTENT, AttributeType.STRING)
+STATE_SCHEMA.add(LOOP_COUNTER, AttributeType.LONG)
+
+
+def state_uri_from_result_uri(result_uri: str) -> str:
+    return result_uri.replace("/result", "/state")
+
+
+def serialize_state(state: State) -> Tuple:
+    return serialize_state_dict(state)
+
+
+def serialize_state_dict(state_dict: State) -> Tuple:
+    loop_counter = int(state_dict.get(LOOP_COUNTER, 0))
+    payload_dict = dict(state_dict)
+    payload_dict.pop(LOOP_COUNTER, None)
+    return Tuple(
+        {
+            SERIALIZED_STATE_CONTENT: dumps_payload(payload_dict),
+            LOOP_COUNTER: loop_counter,
+        },
+        schema=STATE_SCHEMA,
+    )
+
+
+def deserialize_state(row: Tuple) -> State:
+    serialized_content = row[SERIALIZED_STATE_CONTENT] or "{}"
+    state_dict = loads_payload(serialized_content)
+    state_dict[LOOP_COUNTER] = int(row[LOOP_COUNTER])
+    return state_dict
+
+
+def dumps_payload(payload: State) -> str:
+    return json.dumps(_normalize_for_json(payload), separators=(",", ":"))
+
+
+def loads_payload(serialized_payload: str) -> State:
+    return _denormalize_from_json(json.loads(serialized_payload or "{}"))
+
+
+def _normalize_for_json(value: Any) -> Any:
+    if value is None or isinstance(value, (bool, int, float, str)):
+        return value
+    if isinstance(value, bytes):
+        return {
+            _TYPE_MARKER: _BYTES_TYPE,
+            _PAYLOAD_MARKER: base64.b64encode(value).decode("ascii"),
+        }
+    if isinstance(value, dict):
+        return {str(key): _normalize_for_json(inner) for key, inner in 
value.items()}
+    if isinstance(value, (list, tuple)):
+        return [_normalize_for_json(inner) for inner in value]
+    return {
+        _TYPE_MARKER: _PYTHON_PICKLE_TYPE,
+        _PAYLOAD_MARKER: base64.b64encode(pickle.dumps(value)).decode("ascii"),
+    }
+
+
+def _denormalize_from_json(value: Any) -> Any:
+    if isinstance(value, list):
+        return [_denormalize_from_json(inner) for inner in value]
+    if isinstance(value, dict):
+        marker = value.get(_TYPE_MARKER)
+        if marker == _BYTES_TYPE:
+            return base64.b64decode(value[_PAYLOAD_MARKER])
+        if marker == _PYTHON_PICKLE_TYPE:
+            return pickle.loads(base64.b64decode(value[_PAYLOAD_MARKER]))
+        return {
+            key: _denormalize_from_json(inner) for key, inner in value.items()
+        }
+    return value
diff --git a/amber/src/main/python/core/runnables/main_loop.py 
b/amber/src/main/python/core/runnables/main_loop.py
index 9152868c01..910e66e22e 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -39,14 +39,14 @@ from core.models.internal_queue import (
     InternalQueueElement,
 )
 from core.models.operator import LoopEndOperator, LoopStartOperator
-from core.models.state import State
-from core.runnables.data_processor import DataProcessor
-from core.storage.document_factory import DocumentFactory
-from core.storage.state_materialization import (
+from core.models.state import (
+    State,
     STATE_SCHEMA,
     serialize_state_dict,
     state_uri_from_result_uri,
 )
+from core.runnables.data_processor import DataProcessor
+from core.storage.document_factory import DocumentFactory
 from core.util import StoppableQueueBlockingRunnable, get_one_of
 from core.util.console_message.timestamp import current_time_in_local_timezone
 from core.util.customized_queue.queue_base import QueueElement
diff --git a/amber/src/main/python/core/runnables/network_receiver.py 
b/amber/src/main/python/core/runnables/network_receiver.py
index d358f8a8d5..e1815b08f7 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -42,7 +42,7 @@ from core.models.internal_queue import (
     ECMElement,
 )
 from core.proxy import ProxyServer
-from core.storage.state_materialization import STATE_SCHEMA, deserialize_state
+from core.models.state import STATE_SCHEMA, deserialize_state
 from core.util import Stoppable, get_one_of
 from core.util.runnable.runnable import Runnable
 from proto.org.apache.texera.amber.engine.architecture.rpc import 
EmbeddedControlMessage
diff --git a/amber/src/main/python/core/runnables/network_sender.py 
b/amber/src/main/python/core/runnables/network_sender.py
index a6dd9cc6d2..baf503c1f5 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -27,13 +27,13 @@ from core.models.internal_queue import (
     DCMElement,
     ECMElement,
 )
-from core.proxy import ProxyClient
-from core.storage.state_materialization import (
+from core.models.state import (
     SERIALIZED_STATE_CONTENT,
     LOOP_COUNTER,
     STATE_SCHEMA,
     serialize_state_dict,
 )
+from core.proxy import ProxyClient
 from core.util import StoppableQueueBlockingRunnable
 from proto.org.apache.texera.amber.core import ChannelIdentity
 from proto.org.apache.texera.amber.engine.architecture.rpc import 
EmbeddedControlMessage
diff --git 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index c5f81e7371..a66a2ef7c8 100644
--- 
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++ 
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -36,11 +36,8 @@ from core.architecture.sendsemantics.round_robin_partitioner 
import (
 )
 from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State, 
StateFrame
 from core.models.internal_queue import DataElement, ECMElement
+from core.models.state import deserialize_state, state_uri_from_result_uri
 from core.storage.document_factory import DocumentFactory
-from core.storage.state_materialization import (
-    deserialize_state,
-    state_uri_from_result_uri,
-)
 from core.util import Stoppable, get_one_of
 from core.util.runnable.runnable import Runnable
 from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
diff --git a/amber/src/main/python/core/storage/state_materialization.py 
b/amber/src/main/python/core/storage/state_materialization.py
deleted file mode 100644
index b7033daf8d..0000000000
--- a/amber/src/main/python/core/storage/state_materialization.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import base64
-import json
-import pickle
-from typing import Any, Dict
-
-from core.models import Schema, Tuple
-from core.models.schema import AttributeType
-from core.models.state import State
-
-SERIALIZED_STATE_CONTENT = "serialized_state_content"
-LOOP_COUNTER = "loop_counter"
-_TYPE_MARKER = "__texera_type__"
-_PAYLOAD_MARKER = "payload"
-_BYTES_TYPE = "bytes"
-_PYTHON_PICKLE_TYPE = "python_pickle"
-
-STATE_SCHEMA = Schema()
-STATE_SCHEMA.add(SERIALIZED_STATE_CONTENT, AttributeType.STRING)
-STATE_SCHEMA.add(LOOP_COUNTER, AttributeType.LONG)
-
-
-def state_uri_from_result_uri(result_uri: str) -> str:
-    return result_uri.replace("/result", "/state")
-
-
-def serialize_state(state: State) -> Tuple:
-    return serialize_state_dict(state)
-
-
-def serialize_state_dict(state_dict: State) -> Tuple:
-    loop_counter = int(state_dict.get(LOOP_COUNTER, 0))
-    payload_dict = dict(state_dict)
-    payload_dict.pop(LOOP_COUNTER, None)
-    return Tuple(
-        {
-            SERIALIZED_STATE_CONTENT: dumps_payload(payload_dict),
-            LOOP_COUNTER: loop_counter,
-        },
-        schema=STATE_SCHEMA,
-    )
-
-
-def deserialize_state(row: Tuple) -> State:
-    serialized_content = row[SERIALIZED_STATE_CONTENT] or "{}"
-    state_dict = loads_payload(serialized_content)
-    state_dict[LOOP_COUNTER] = int(row[LOOP_COUNTER])
-    return state_dict
-
-
-def dumps_payload(payload: State) -> str:
-    return json.dumps(_normalize_for_json(payload), separators=(",", ":"))
-
-
-def loads_payload(serialized_payload: str) -> State:
-    return _denormalize_from_json(json.loads(serialized_payload or "{}"))
-
-
-def _normalize_for_json(value: Any) -> Any:
-    if value is None or isinstance(value, (bool, int, float, str)):
-        return value
-    if isinstance(value, bytes):
-        return {
-            _TYPE_MARKER: _BYTES_TYPE,
-            _PAYLOAD_MARKER: base64.b64encode(value).decode("ascii"),
-        }
-    if isinstance(value, dict):
-        return {str(key): _normalize_for_json(inner) for key, inner in 
value.items()}
-    if isinstance(value, (list, tuple)):
-        return [_normalize_for_json(inner) for inner in value]
-    return {
-        _TYPE_MARKER: _PYTHON_PICKLE_TYPE,
-        _PAYLOAD_MARKER: base64.b64encode(pickle.dumps(value)).decode("ascii"),
-    }
-
-
-def _denormalize_from_json(value: Any) -> Any:
-    if isinstance(value, list):
-        return [_denormalize_from_json(inner) for inner in value]
-    if isinstance(value, dict):
-        marker = value.get(_TYPE_MARKER)
-        if marker == _BYTES_TYPE:
-            return base64.b64decode(value[_PAYLOAD_MARKER])
-        if marker == _PYTHON_PICKLE_TYPE:
-            return pickle.loads(base64.b64decode(value[_PAYLOAD_MARKER]))
-        return {
-            key: _denormalize_from_json(inner) for key, inner in value.items()
-        }
-    return value
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index c62887ae08..a017df6e29 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -19,8 +19,7 @@
 
 package org.apache.texera.amber.engine.architecture.messaginglayer
 
-import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.{State, serializeState, 
stateUriFromResultUri}
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.BufferedItemWriter
 import org.apache.texera.amber.core.tuple._
@@ -240,11 +239,11 @@ class OutputManager(
       storageUris.foreach {
         case (_, uri) =>
           val writer = DocumentFactory
-            .openDocument(StateMaterialization.stateUriFromResultUri(uri))
+            .openDocument(stateUriFromResultUri(uri))
             ._1
             .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
             .asInstanceOf[BufferedItemWriter[Tuple]]
-          writer.putOne(StateMaterialization.serializeState(state))
+          writer.putOne(serializeState(state))
           writer.close()
       }
     } catch {
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
index 7515b82d30..d870c71e4d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
@@ -21,7 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.pythonworker
 
 import com.twitter.util.{Await, Promise}
 import org.apache.texera.amber.core.WorkflowRuntimeException
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.serializeStateMap
 import org.apache.texera.amber.core.tuple.{Schema, Tuple}
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import 
org.apache.texera.amber.engine.architecture.pythonworker.WorkerBatchInternalQueue.{
@@ -127,7 +127,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int], 
val actorId: ActorVirtu
         writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*), 
from, "Data")
       case StateFrame(state) =>
         writeArrowStream(
-          mutable.Queue(StateMaterialization.serializeStateMap(state)),
+          mutable.Queue(serializeStateMap(state)),
           from,
           "State"
         )
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
index e3c9ed569c..302c126c45 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
@@ -21,7 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.pythonworker
 
 import com.google.common.primitives.Longs
 import com.twitter.util.Promise
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.deserializeState
 import org.apache.texera.amber.core.tuple.Tuple
 import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, 
ChannelIdentity}
 import 
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
@@ -130,7 +130,7 @@ private class AmberProducer(
         assert(root.getRowCount == 1)
         outputPort.sendTo(
           to,
-          
StateFrame(StateMaterialization.deserializeState(ArrowUtils.getTexeraTuple(0, 
root)))
+          StateFrame(deserializeState(ArrowUtils.getTexeraTuple(0, root)))
         )
       case "ECM" =>
         assert(root.getRowCount == 1)
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 1f70f9c510..d851e4aab5 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
 
 import org.apache.pekko.pattern.gracefulStop
 import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer, 
Return, Throw, Timer}
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.{stateMaterializationSchema, 
stateUriFromResultUri}
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
 import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -567,7 +567,7 @@ class RegionExecutionCoordinator(
     portConfigs.foreach {
       case (outputPortId, portConfig) =>
         val storageUriToAdd = portConfig.storageURI
-        val stateUriToAdd = 
StateMaterialization.stateUriFromResultUri(storageUriToAdd)
+          val stateUriToAdd = stateUriFromResultUri(storageUriToAdd)
         val (_, eid, _, _) = decodeURI(storageUriToAdd)
         val schemaOptional =
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
@@ -585,11 +585,11 @@ class RegionExecutionCoordinator(
             DocumentFactory.openDocument(stateUriToAdd)
           } catch {
             case _: Exception =>
-              DocumentFactory.createDocument(stateUriToAdd, 
StateMaterialization.schema)
+                DocumentFactory.createDocument(stateUriToAdd, 
stateMaterializationSchema)
           }
         } else {
           DocumentFactory.createDocument(storageUriToAdd, schema)
-          DocumentFactory.createDocument(stateUriToAdd, 
StateMaterialization.schema)
+                DocumentFactory.createDocument(stateUriToAdd, 
stateMaterializationSchema)
         }
 
         WorkflowExecutionsResource.insertOperatorPortResultUri(
diff --git 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 6868ff3eec..17bf82a90d 100644
--- 
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++ 
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -21,7 +21,7 @@ package 
org.apache.texera.amber.engine.architecture.worker.managers
 
 import io.grpc.MethodDescriptor
 import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.{deserializeState, 
stateUriFromResultUri}
 import org.apache.texera.amber.core.storage.DocumentFactory
 import org.apache.texera.amber.core.storage.model.VirtualDocument
 import org.apache.texera.amber.core.tuple.Tuple
@@ -115,13 +115,13 @@ class InputPortMaterializationReaderThread(
       try {
         val state_document =
           DocumentFactory
-            .openDocument(StateMaterialization.stateUriFromResultUri(uri))
+            .openDocument(stateUriFromResultUri(uri))
             ._1
             .asInstanceOf[VirtualDocument[Tuple]]
         val stateReadIterator = state_document.get()
 
         while (stateReadIterator.hasNext) {
-          val state = 
StateMaterialization.deserializeState(stateReadIterator.next())
+          val state = deserializeState(stateReadIterator.next())
           inputMessageQueue.put(
             FIFOMessageElement(WorkflowFIFOMessage(channelId, 
getSequenceNumber, StateFrame(state)))
           )
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateMaterialization.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateMaterialization.scala
deleted file mode 100644
index f1a0fc51e7..0000000000
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateMaterialization.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.core.state
-
-import com.fasterxml.jackson.databind.JsonNode
-import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
-import org.apache.texera.amber.util.JSONUtils.objectMapper
-
-import java.net.URI
-import java.util.Base64
-import scala.jdk.CollectionConverters.IteratorHasAsScala
-
-object StateMaterialization {
-  val SerializedStateContentColumn = "serialized_state_content"
-  val LoopCounterColumn = "loop_counter"
-  private val BytesTypeMarker = "__texera_type__"
-  private val BytesValue = "bytes"
-  private val PayloadMarker = "payload"
-
-  val schema: Schema = new Schema(
-    new Attribute(SerializedStateContentColumn, AttributeType.STRING),
-    new Attribute(LoopCounterColumn, AttributeType.LONG)
-  )
-
-  def stateUriFromResultUri(resultUri: URI): URI = {
-    new URI(resultUri.toString.replace("/result", "/state"))
-  }
-
-  def serializeState(state: State): Tuple = {
-    serializeStateMap(state)
-  }
-
-  def serializeStateMap(state: Map[String, Any]): Tuple = {
-    val loopCounter = state.get(LoopCounterColumn).map(toLong).getOrElse(0L)
-    val payloadJson = dumpsPayload(state.removed(LoopCounterColumn))
-
-    Tuple
-      .builder(schema)
-      .addSequentially(Array(payloadJson, loopCounter))
-      .build()
-  }
-
-  def deserializeState(tuple: Tuple): State = {
-    deserializeStateMap(tuple)
-  }
-
-  def deserializeStateMap(tuple: Tuple): Map[String, Any] = {
-    val serializedContent =
-      
Option(tuple.getField[String](SerializedStateContentColumn)).getOrElse("{}")
-    val loopCounter = toLong(tuple.getField[Any](LoopCounterColumn))
-    loadsPayload(serializedContent) + (LoopCounterColumn -> loopCounter)
-  }
-
-  def dumpsPayload(payload: Map[String, Any]): String = {
-    
objectMapper.writeValueAsString(payload.view.mapValues(normalizeForJson).toMap)
-  }
-
-  def loadsPayload(serializedPayload: String): Map[String, Any] = {
-    val root = objectMapper.readTree(Option(serializedPayload).getOrElse("{}"))
-    if (root == null || !root.isObject) {
-      Map.empty
-    } else {
-      root.fields().asScala.map(entry => entry.getKey -> 
denormalizeFromJson(entry.getValue)).toMap
-    }
-  }
-
-  private def normalizeForJson(value: Any): Any = {
-    value match {
-      case null               => null
-      case bytes: Array[Byte] =>
-        Map(BytesTypeMarker -> BytesValue, PayloadMarker -> 
Base64.getEncoder.encodeToString(bytes))
-      case map: Map[_, _] =>
-        map.iterator.map { case (k, v) => k.toString -> normalizeForJson(v) 
}.toMap
-      case iterable: Iterable[_] =>
-        iterable.map(normalizeForJson).toList
-      case other              => other
-    }
-  }
-
-  private def denormalizeFromJson(node: JsonNode): Any = {
-    if (node == null || node.isNull) {
-      null
-    } else if (node.isObject) {
-      val fields = node.fields().asScala.map(entry => entry.getKey -> 
entry.getValue).toMap
-      fields.get(BytesTypeMarker) match {
-        case Some(typeNode) if typeNode.isTextual && typeNode.asText() == 
BytesValue =>
-          Base64.getDecoder.decode(fields(PayloadMarker).asText())
-        case _ =>
-          fields.view.mapValues(denormalizeFromJson).toMap
-      }
-    } else if (node.isArray) {
-      node.elements().asScala.map(denormalizeFromJson).toList
-    } else if (node.isBoolean) {
-      node.asBoolean()
-    } else if (node.isIntegralNumber) {
-      node.longValue()
-    } else if (node.isFloatingPointNumber) {
-      node.doubleValue()
-    } else {
-      node.asText()
-    }
-  }
-
-  private def toLong(value: Any): Long = {
-    value match {
-      case null                     => 0L
-      case number: java.lang.Number => number.longValue()
-      case text: String             => text.toLong
-      case other =>
-        throw new IllegalArgumentException(s"Cannot convert $other to loop 
counter")
-    }
-  }
-}
diff --git 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
index c110f9d814..14acad3455 100644
--- 
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
+++ 
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
@@ -19,6 +19,110 @@
 
 package org.apache.texera.amber.core
 
+import com.fasterxml.jackson.databind.JsonNode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, 
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+import java.net.URI
+import java.util.Base64
+import scala.jdk.CollectionConverters.IteratorHasAsScala
+
 package object state {
   type State = Map[String, Any]
+
+  val SerializedStateContentColumn = "serialized_state_content"
+  val LoopCounterColumn = "loop_counter"
+  private[state] val BytesTypeMarker = "__texera_type__"
+  private[state] val BytesValue = "bytes"
+  private[state] val PayloadMarker = "payload"
+
+  val stateMaterializationSchema: Schema = new Schema(
+    new Attribute(SerializedStateContentColumn, AttributeType.STRING),
+    new Attribute(LoopCounterColumn, AttributeType.LONG)
+  )
+
+  def stateUriFromResultUri(resultUri: URI): URI = {
+    new URI(resultUri.toString.replace("/result", "/state"))
+  }
+
+  def serializeState(state: State): Tuple = {
+    serializeStateMap(state)
+  }
+
+  def serializeStateMap(state: Map[String, Any]): Tuple = {
+    val loopCounter = state.get(LoopCounterColumn).map(toLong).getOrElse(0L)
+    val payloadJson = dumpsPayload(state.removed(LoopCounterColumn))
+
+    Tuple
+      .builder(stateMaterializationSchema)
+      .addSequentially(Array(payloadJson, loopCounter))
+      .build()
+  }
+
+  def deserializeState(tuple: Tuple): State = {
+    val serializedContent =
+      
Option(tuple.getField[String](SerializedStateContentColumn)).getOrElse("{}")
+    val loopCounter = toLong(tuple.getField[Any](LoopCounterColumn))
+    loadsPayload(serializedContent) + (LoopCounterColumn -> loopCounter)
+  }
+
+  def dumpsPayload(payload: Map[String, Any]): String = {
+    
objectMapper.writeValueAsString(payload.view.mapValues(normalizeForJson).toMap)
+  }
+
+  def loadsPayload(serializedPayload: String): Map[String, Any] = {
+    val root = objectMapper.readTree(Option(serializedPayload).getOrElse("{}"))
+    if (root == null || !root.isObject) {
+      Map.empty
+    } else {
+      root.fields().asScala.map(entry => entry.getKey -> 
denormalizeFromJson(entry.getValue)).toMap
+    }
+  }
+
+  private def normalizeForJson(value: Any): Any = {
+    value match {
+      case null               => null
+      case bytes: Array[Byte] =>
+        Map(BytesTypeMarker -> BytesValue, PayloadMarker -> 
Base64.getEncoder.encodeToString(bytes))
+      case map: Map[_, _]     =>
+        map.iterator.map { case (k, v) => k.toString -> normalizeForJson(v) 
}.toMap
+      case iterable: Iterable[_] =>
+        iterable.map(normalizeForJson).toList
+      case other              => other
+    }
+  }
+
+  private def denormalizeFromJson(node: JsonNode): Any = {
+    if (node == null || node.isNull) {
+      null
+    } else if (node.isObject) {
+      val fields = node.fields().asScala.map(entry => entry.getKey -> 
entry.getValue).toMap
+      fields.get(BytesTypeMarker) match {
+        case Some(typeNode) if typeNode.isTextual && typeNode.asText() == 
BytesValue =>
+          Base64.getDecoder.decode(fields(PayloadMarker).asText())
+        case _ =>
+          fields.view.mapValues(denormalizeFromJson).toMap
+      }
+    } else if (node.isArray) {
+      node.elements().asScala.map(denormalizeFromJson).toList
+    } else if (node.isBoolean) {
+      node.asBoolean()
+    } else if (node.isIntegralNumber) {
+      node.longValue()
+    } else if (node.isFloatingPointNumber) {
+      node.doubleValue()
+    } else {
+      node.asText()
+    }
+  }
+
+  private def toLong(value: Any): Long = {
+    value match {
+      case null                     => 0L
+      case number: java.lang.Number => number.longValue()
+      case text: String             => text.toLong
+      case other =>
+        throw new IllegalArgumentException(s"Cannot convert $other to loop 
counter")
+    }
+  }
 }

Reply via email to