This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch xinyuan-loop-feb
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/xinyuan-loop-feb by this push:
new af0c0ea9f0 fix fmt
af0c0ea9f0 is described below
commit af0c0ea9f0110646e85db39baa1c8d7c884556a8
Author: Xinyuan Lin <[email protected]>
AuthorDate: Sat Apr 18 01:44:21 2026 -0700
fix fmt
---
.../core/architecture/packaging/output_manager.py | 6 +-
amber/src/main/python/core/models/state.py | 86 ++++++++++++++
amber/src/main/python/core/runnables/main_loop.py | 8 +-
.../main/python/core/runnables/network_receiver.py | 2 +-
.../main/python/core/runnables/network_sender.py | 4 +-
.../input_port_materialization_reader_runnable.py | 5 +-
.../python/core/storage/state_materialization.py | 105 -----------------
.../messaginglayer/OutputManager.scala | 7 +-
.../pythonworker/PythonProxyClient.scala | 4 +-
.../pythonworker/PythonProxyServer.scala | 4 +-
.../scheduling/RegionExecutionCoordinator.scala | 8 +-
.../InputPortMaterializationReaderThread.scala | 6 +-
.../amber/core/state/StateMaterialization.scala | 130 ---------------------
.../apache/texera/amber/core/state/package.scala | 104 +++++++++++++++++
14 files changed, 215 insertions(+), 264 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/packaging/output_manager.py
b/amber/src/main/python/core/architecture/packaging/output_manager.py
index e3e500200b..eae79fb607 100644
--- a/amber/src/main/python/core/architecture/packaging/output_manager.py
+++ b/amber/src/main/python/core/architecture/packaging/output_manager.py
@@ -44,13 +44,13 @@ from
core.architecture.sendsemantics.round_robin_partitioner import (
)
from core.models import Tuple, Schema, StateFrame
from core.models.payload import DataPayload, DataFrame
-from core.models.state import State
-from core.storage.document_factory import DocumentFactory
-from core.storage.state_materialization import (
+from core.models.state import (
+ State,
STATE_SCHEMA,
serialize_state,
state_uri_from_result_uri,
)
+from core.storage.document_factory import DocumentFactory
from core.storage.runnables.port_storage_writer import (
PortStorageWriter,
PortStorageWriterElement,
diff --git a/amber/src/main/python/core/models/state.py
b/amber/src/main/python/core/models/state.py
index 1b8cb29888..7ff3daf2f4 100644
--- a/amber/src/main/python/core/models/state.py
+++ b/amber/src/main/python/core/models/state.py
@@ -15,6 +15,92 @@
# specific language governing permissions and limitations
# under the License.
+import base64
+import json
+import pickle
from typing import Any, Dict, TypeAlias
+from . import Schema, Tuple
+from .schema import AttributeType
+
State: TypeAlias = Dict[str, Any]
+
+SERIALIZED_STATE_CONTENT = "serialized_state_content"
+LOOP_COUNTER = "loop_counter"
+_TYPE_MARKER = "__texera_type__"
+_PAYLOAD_MARKER = "payload"
+_BYTES_TYPE = "bytes"
+_PYTHON_PICKLE_TYPE = "python_pickle"
+
+STATE_SCHEMA = Schema()
+STATE_SCHEMA.add(SERIALIZED_STATE_CONTENT, AttributeType.STRING)
+STATE_SCHEMA.add(LOOP_COUNTER, AttributeType.LONG)
+
+
+def state_uri_from_result_uri(result_uri: str) -> str:
+ return result_uri.replace("/result", "/state")
+
+
+def serialize_state(state: State) -> Tuple:
+ return serialize_state_dict(state)
+
+
+def serialize_state_dict(state_dict: State) -> Tuple:
+ loop_counter = int(state_dict.get(LOOP_COUNTER, 0))
+ payload_dict = dict(state_dict)
+ payload_dict.pop(LOOP_COUNTER, None)
+ return Tuple(
+ {
+ SERIALIZED_STATE_CONTENT: dumps_payload(payload_dict),
+ LOOP_COUNTER: loop_counter,
+ },
+ schema=STATE_SCHEMA,
+ )
+
+
+def deserialize_state(row: Tuple) -> State:
+ serialized_content = row[SERIALIZED_STATE_CONTENT] or "{}"
+ state_dict = loads_payload(serialized_content)
+ state_dict[LOOP_COUNTER] = int(row[LOOP_COUNTER])
+ return state_dict
+
+
+def dumps_payload(payload: State) -> str:
+ return json.dumps(_normalize_for_json(payload), separators=(",", ":"))
+
+
+def loads_payload(serialized_payload: str) -> State:
+ return _denormalize_from_json(json.loads(serialized_payload or "{}"))
+
+
+def _normalize_for_json(value: Any) -> Any:
+ if value is None or isinstance(value, (bool, int, float, str)):
+ return value
+ if isinstance(value, bytes):
+ return {
+ _TYPE_MARKER: _BYTES_TYPE,
+ _PAYLOAD_MARKER: base64.b64encode(value).decode("ascii"),
+ }
+ if isinstance(value, dict):
+ return {str(key): _normalize_for_json(inner) for key, inner in
value.items()}
+ if isinstance(value, (list, tuple)):
+ return [_normalize_for_json(inner) for inner in value]
+ return {
+ _TYPE_MARKER: _PYTHON_PICKLE_TYPE,
+ _PAYLOAD_MARKER: base64.b64encode(pickle.dumps(value)).decode("ascii"),
+ }
+
+
+def _denormalize_from_json(value: Any) -> Any:
+ if isinstance(value, list):
+ return [_denormalize_from_json(inner) for inner in value]
+ if isinstance(value, dict):
+ marker = value.get(_TYPE_MARKER)
+ if marker == _BYTES_TYPE:
+ return base64.b64decode(value[_PAYLOAD_MARKER])
+ if marker == _PYTHON_PICKLE_TYPE:
+ return pickle.loads(base64.b64decode(value[_PAYLOAD_MARKER]))
+ return {
+ key: _denormalize_from_json(inner) for key, inner in value.items()
+ }
+ return value
diff --git a/amber/src/main/python/core/runnables/main_loop.py
b/amber/src/main/python/core/runnables/main_loop.py
index 9152868c01..910e66e22e 100644
--- a/amber/src/main/python/core/runnables/main_loop.py
+++ b/amber/src/main/python/core/runnables/main_loop.py
@@ -39,14 +39,14 @@ from core.models.internal_queue import (
InternalQueueElement,
)
from core.models.operator import LoopEndOperator, LoopStartOperator
-from core.models.state import State
-from core.runnables.data_processor import DataProcessor
-from core.storage.document_factory import DocumentFactory
-from core.storage.state_materialization import (
+from core.models.state import (
+ State,
STATE_SCHEMA,
serialize_state_dict,
state_uri_from_result_uri,
)
+from core.runnables.data_processor import DataProcessor
+from core.storage.document_factory import DocumentFactory
from core.util import StoppableQueueBlockingRunnable, get_one_of
from core.util.console_message.timestamp import current_time_in_local_timezone
from core.util.customized_queue.queue_base import QueueElement
diff --git a/amber/src/main/python/core/runnables/network_receiver.py
b/amber/src/main/python/core/runnables/network_receiver.py
index d358f8a8d5..e1815b08f7 100644
--- a/amber/src/main/python/core/runnables/network_receiver.py
+++ b/amber/src/main/python/core/runnables/network_receiver.py
@@ -42,7 +42,7 @@ from core.models.internal_queue import (
ECMElement,
)
from core.proxy import ProxyServer
-from core.storage.state_materialization import STATE_SCHEMA, deserialize_state
+from core.models.state import STATE_SCHEMA, deserialize_state
from core.util import Stoppable, get_one_of
from core.util.runnable.runnable import Runnable
from proto.org.apache.texera.amber.engine.architecture.rpc import
EmbeddedControlMessage
diff --git a/amber/src/main/python/core/runnables/network_sender.py
b/amber/src/main/python/core/runnables/network_sender.py
index a6dd9cc6d2..baf503c1f5 100644
--- a/amber/src/main/python/core/runnables/network_sender.py
+++ b/amber/src/main/python/core/runnables/network_sender.py
@@ -27,13 +27,13 @@ from core.models.internal_queue import (
DCMElement,
ECMElement,
)
-from core.proxy import ProxyClient
-from core.storage.state_materialization import (
+from core.models.state import (
SERIALIZED_STATE_CONTENT,
LOOP_COUNTER,
STATE_SCHEMA,
serialize_state_dict,
)
+from core.proxy import ProxyClient
from core.util import StoppableQueueBlockingRunnable
from proto.org.apache.texera.amber.core import ChannelIdentity
from proto.org.apache.texera.amber.engine.architecture.rpc import
EmbeddedControlMessage
diff --git
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
index c5f81e7371..a66a2ef7c8 100644
---
a/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
+++
b/amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py
@@ -36,11 +36,8 @@ from core.architecture.sendsemantics.round_robin_partitioner
import (
)
from core.models import Tuple, InternalQueue, DataFrame, DataPayload, State,
StateFrame
from core.models.internal_queue import DataElement, ECMElement
+from core.models.state import deserialize_state, state_uri_from_result_uri
from core.storage.document_factory import DocumentFactory
-from core.storage.state_materialization import (
- deserialize_state,
- state_uri_from_result_uri,
-)
from core.util import Stoppable, get_one_of
from core.util.runnable.runnable import Runnable
from core.util.virtual_identity import get_from_actor_id_for_input_port_storage
diff --git a/amber/src/main/python/core/storage/state_materialization.py
b/amber/src/main/python/core/storage/state_materialization.py
deleted file mode 100644
index b7033daf8d..0000000000
--- a/amber/src/main/python/core/storage/state_materialization.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import base64
-import json
-import pickle
-from typing import Any, Dict
-
-from core.models import Schema, Tuple
-from core.models.schema import AttributeType
-from core.models.state import State
-
-SERIALIZED_STATE_CONTENT = "serialized_state_content"
-LOOP_COUNTER = "loop_counter"
-_TYPE_MARKER = "__texera_type__"
-_PAYLOAD_MARKER = "payload"
-_BYTES_TYPE = "bytes"
-_PYTHON_PICKLE_TYPE = "python_pickle"
-
-STATE_SCHEMA = Schema()
-STATE_SCHEMA.add(SERIALIZED_STATE_CONTENT, AttributeType.STRING)
-STATE_SCHEMA.add(LOOP_COUNTER, AttributeType.LONG)
-
-
-def state_uri_from_result_uri(result_uri: str) -> str:
- return result_uri.replace("/result", "/state")
-
-
-def serialize_state(state: State) -> Tuple:
- return serialize_state_dict(state)
-
-
-def serialize_state_dict(state_dict: State) -> Tuple:
- loop_counter = int(state_dict.get(LOOP_COUNTER, 0))
- payload_dict = dict(state_dict)
- payload_dict.pop(LOOP_COUNTER, None)
- return Tuple(
- {
- SERIALIZED_STATE_CONTENT: dumps_payload(payload_dict),
- LOOP_COUNTER: loop_counter,
- },
- schema=STATE_SCHEMA,
- )
-
-
-def deserialize_state(row: Tuple) -> State:
- serialized_content = row[SERIALIZED_STATE_CONTENT] or "{}"
- state_dict = loads_payload(serialized_content)
- state_dict[LOOP_COUNTER] = int(row[LOOP_COUNTER])
- return state_dict
-
-
-def dumps_payload(payload: State) -> str:
- return json.dumps(_normalize_for_json(payload), separators=(",", ":"))
-
-
-def loads_payload(serialized_payload: str) -> State:
- return _denormalize_from_json(json.loads(serialized_payload or "{}"))
-
-
-def _normalize_for_json(value: Any) -> Any:
- if value is None or isinstance(value, (bool, int, float, str)):
- return value
- if isinstance(value, bytes):
- return {
- _TYPE_MARKER: _BYTES_TYPE,
- _PAYLOAD_MARKER: base64.b64encode(value).decode("ascii"),
- }
- if isinstance(value, dict):
- return {str(key): _normalize_for_json(inner) for key, inner in
value.items()}
- if isinstance(value, (list, tuple)):
- return [_normalize_for_json(inner) for inner in value]
- return {
- _TYPE_MARKER: _PYTHON_PICKLE_TYPE,
- _PAYLOAD_MARKER: base64.b64encode(pickle.dumps(value)).decode("ascii"),
- }
-
-
-def _denormalize_from_json(value: Any) -> Any:
- if isinstance(value, list):
- return [_denormalize_from_json(inner) for inner in value]
- if isinstance(value, dict):
- marker = value.get(_TYPE_MARKER)
- if marker == _BYTES_TYPE:
- return base64.b64decode(value[_PAYLOAD_MARKER])
- if marker == _PYTHON_PICKLE_TYPE:
- return pickle.loads(base64.b64decode(value[_PAYLOAD_MARKER]))
- return {
- key: _denormalize_from_json(inner) for key, inner in value.items()
- }
- return value
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
index c62887ae08..a017df6e29 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala
@@ -19,8 +19,7 @@
package org.apache.texera.amber.engine.architecture.messaginglayer
-import org.apache.texera.amber.core.state.State
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.{State, serializeState,
stateUriFromResultUri}
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.tuple._
@@ -240,11 +239,11 @@ class OutputManager(
storageUris.foreach {
case (_, uri) =>
val writer = DocumentFactory
- .openDocument(StateMaterialization.stateUriFromResultUri(uri))
+ .openDocument(stateUriFromResultUri(uri))
._1
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
- writer.putOne(StateMaterialization.serializeState(state))
+ writer.putOne(serializeState(state))
writer.close()
}
} catch {
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
index 7515b82d30..d870c71e4d 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala
@@ -21,7 +21,7 @@ package
org.apache.texera.amber.engine.architecture.pythonworker
import com.twitter.util.{Await, Promise}
import org.apache.texera.amber.core.WorkflowRuntimeException
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.serializeStateMap
import org.apache.texera.amber.core.tuple.{Schema, Tuple}
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
import
org.apache.texera.amber.engine.architecture.pythonworker.WorkerBatchInternalQueue.{
@@ -127,7 +127,7 @@ class PythonProxyClient(portNumberPromise: Promise[Int],
val actorId: ActorVirtu
writeArrowStream(mutable.Queue(ArraySeq.unsafeWrapArray(frame): _*),
from, "Data")
case StateFrame(state) =>
writeArrowStream(
- mutable.Queue(StateMaterialization.serializeStateMap(state)),
+ mutable.Queue(serializeStateMap(state)),
from,
"State"
)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
index e3c9ed569c..302c126c45 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyServer.scala
@@ -21,7 +21,7 @@ package
org.apache.texera.amber.engine.architecture.pythonworker
import com.google.common.primitives.Longs
import com.twitter.util.Promise
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.deserializeState
import org.apache.texera.amber.core.tuple.Tuple
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity,
ChannelIdentity}
import
org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
@@ -130,7 +130,7 @@ private class AmberProducer(
assert(root.getRowCount == 1)
outputPort.sendTo(
to,
-
StateFrame(StateMaterialization.deserializeState(ArrowUtils.getTexeraTuple(0,
root)))
+ StateFrame(deserializeState(ArrowUtils.getTexeraTuple(0, root)))
)
case "ECM" =>
assert(root.getRowCount == 1)
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
index 1f70f9c510..d851e4aab5 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling
import org.apache.pekko.pattern.gracefulStop
import com.twitter.util.{Duration => TwitterDuration, Future, JavaTimer,
Return, Throw, Timer}
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.{stateMaterializationSchema,
stateUriFromResultUri}
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.VFSURIFactory.decodeURI
import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
@@ -567,7 +567,7 @@ class RegionExecutionCoordinator(
portConfigs.foreach {
case (outputPortId, portConfig) =>
val storageUriToAdd = portConfig.storageURI
- val stateUriToAdd =
StateMaterialization.stateUriFromResultUri(storageUriToAdd)
+ val stateUriToAdd = stateUriFromResultUri(storageUriToAdd)
val (_, eid, _, _) = decodeURI(storageUriToAdd)
val schemaOptional =
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
@@ -585,11 +585,11 @@ class RegionExecutionCoordinator(
DocumentFactory.openDocument(stateUriToAdd)
} catch {
case _: Exception =>
- DocumentFactory.createDocument(stateUriToAdd,
StateMaterialization.schema)
+ DocumentFactory.createDocument(stateUriToAdd,
stateMaterializationSchema)
}
} else {
DocumentFactory.createDocument(storageUriToAdd, schema)
- DocumentFactory.createDocument(stateUriToAdd,
StateMaterialization.schema)
+ DocumentFactory.createDocument(stateUriToAdd,
stateMaterializationSchema)
}
WorkflowExecutionsResource.insertOperatorPortResultUri(
diff --git
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
index 6868ff3eec..17bf82a90d 100644
---
a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
+++
b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala
@@ -21,7 +21,7 @@ package
org.apache.texera.amber.engine.architecture.worker.managers
import io.grpc.MethodDescriptor
import org.apache.texera.amber.config.ApplicationConfig
-import org.apache.texera.amber.core.state.StateMaterialization
+import org.apache.texera.amber.core.state.{deserializeState,
stateUriFromResultUri}
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.VirtualDocument
import org.apache.texera.amber.core.tuple.Tuple
@@ -115,13 +115,13 @@ class InputPortMaterializationReaderThread(
try {
val state_document =
DocumentFactory
- .openDocument(StateMaterialization.stateUriFromResultUri(uri))
+ .openDocument(stateUriFromResultUri(uri))
._1
.asInstanceOf[VirtualDocument[Tuple]]
val stateReadIterator = state_document.get()
while (stateReadIterator.hasNext) {
- val state =
StateMaterialization.deserializeState(stateReadIterator.next())
+ val state = deserializeState(stateReadIterator.next())
inputMessageQueue.put(
FIFOMessageElement(WorkflowFIFOMessage(channelId,
getSequenceNumber, StateFrame(state)))
)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateMaterialization.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateMaterialization.scala
deleted file mode 100644
index f1a0fc51e7..0000000000
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/StateMaterialization.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.texera.amber.core.state
-
-import com.fasterxml.jackson.databind.JsonNode
-import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
-import org.apache.texera.amber.util.JSONUtils.objectMapper
-
-import java.net.URI
-import java.util.Base64
-import scala.jdk.CollectionConverters.IteratorHasAsScala
-
-object StateMaterialization {
- val SerializedStateContentColumn = "serialized_state_content"
- val LoopCounterColumn = "loop_counter"
- private val BytesTypeMarker = "__texera_type__"
- private val BytesValue = "bytes"
- private val PayloadMarker = "payload"
-
- val schema: Schema = new Schema(
- new Attribute(SerializedStateContentColumn, AttributeType.STRING),
- new Attribute(LoopCounterColumn, AttributeType.LONG)
- )
-
- def stateUriFromResultUri(resultUri: URI): URI = {
- new URI(resultUri.toString.replace("/result", "/state"))
- }
-
- def serializeState(state: State): Tuple = {
- serializeStateMap(state)
- }
-
- def serializeStateMap(state: Map[String, Any]): Tuple = {
- val loopCounter = state.get(LoopCounterColumn).map(toLong).getOrElse(0L)
- val payloadJson = dumpsPayload(state.removed(LoopCounterColumn))
-
- Tuple
- .builder(schema)
- .addSequentially(Array(payloadJson, loopCounter))
- .build()
- }
-
- def deserializeState(tuple: Tuple): State = {
- deserializeStateMap(tuple)
- }
-
- def deserializeStateMap(tuple: Tuple): Map[String, Any] = {
- val serializedContent =
-
Option(tuple.getField[String](SerializedStateContentColumn)).getOrElse("{}")
- val loopCounter = toLong(tuple.getField[Any](LoopCounterColumn))
- loadsPayload(serializedContent) + (LoopCounterColumn -> loopCounter)
- }
-
- def dumpsPayload(payload: Map[String, Any]): String = {
-
objectMapper.writeValueAsString(payload.view.mapValues(normalizeForJson).toMap)
- }
-
- def loadsPayload(serializedPayload: String): Map[String, Any] = {
- val root = objectMapper.readTree(Option(serializedPayload).getOrElse("{}"))
- if (root == null || !root.isObject) {
- Map.empty
- } else {
- root.fields().asScala.map(entry => entry.getKey ->
denormalizeFromJson(entry.getValue)).toMap
- }
- }
-
- private def normalizeForJson(value: Any): Any = {
- value match {
- case null => null
- case bytes: Array[Byte] =>
- Map(BytesTypeMarker -> BytesValue, PayloadMarker ->
Base64.getEncoder.encodeToString(bytes))
- case map: Map[_, _] =>
- map.iterator.map { case (k, v) => k.toString -> normalizeForJson(v)
}.toMap
- case iterable: Iterable[_] =>
- iterable.map(normalizeForJson).toList
- case other => other
- }
- }
-
- private def denormalizeFromJson(node: JsonNode): Any = {
- if (node == null || node.isNull) {
- null
- } else if (node.isObject) {
- val fields = node.fields().asScala.map(entry => entry.getKey ->
entry.getValue).toMap
- fields.get(BytesTypeMarker) match {
- case Some(typeNode) if typeNode.isTextual && typeNode.asText() ==
BytesValue =>
- Base64.getDecoder.decode(fields(PayloadMarker).asText())
- case _ =>
- fields.view.mapValues(denormalizeFromJson).toMap
- }
- } else if (node.isArray) {
- node.elements().asScala.map(denormalizeFromJson).toList
- } else if (node.isBoolean) {
- node.asBoolean()
- } else if (node.isIntegralNumber) {
- node.longValue()
- } else if (node.isFloatingPointNumber) {
- node.doubleValue()
- } else {
- node.asText()
- }
- }
-
- private def toLong(value: Any): Long = {
- value match {
- case null => 0L
- case number: java.lang.Number => number.longValue()
- case text: String => text.toLong
- case other =>
- throw new IllegalArgumentException(s"Cannot convert $other to loop
counter")
- }
- }
-}
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
index c110f9d814..14acad3455 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/package.scala
@@ -19,6 +19,110 @@
package org.apache.texera.amber.core
+import com.fasterxml.jackson.databind.JsonNode
+import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema,
Tuple}
+import org.apache.texera.amber.util.JSONUtils.objectMapper
+
+import java.net.URI
+import java.util.Base64
+import scala.jdk.CollectionConverters.IteratorHasAsScala
+
package object state {
type State = Map[String, Any]
+
+ val SerializedStateContentColumn = "serialized_state_content"
+ val LoopCounterColumn = "loop_counter"
+ private[state] val BytesTypeMarker = "__texera_type__"
+ private[state] val BytesValue = "bytes"
+ private[state] val PayloadMarker = "payload"
+
+ val stateMaterializationSchema: Schema = new Schema(
+ new Attribute(SerializedStateContentColumn, AttributeType.STRING),
+ new Attribute(LoopCounterColumn, AttributeType.LONG)
+ )
+
+ def stateUriFromResultUri(resultUri: URI): URI = {
+ new URI(resultUri.toString.replace("/result", "/state"))
+ }
+
+ def serializeState(state: State): Tuple = {
+ serializeStateMap(state)
+ }
+
+ def serializeStateMap(state: Map[String, Any]): Tuple = {
+ val loopCounter = state.get(LoopCounterColumn).map(toLong).getOrElse(0L)
+ val payloadJson = dumpsPayload(state.removed(LoopCounterColumn))
+
+ Tuple
+ .builder(stateMaterializationSchema)
+ .addSequentially(Array(payloadJson, loopCounter))
+ .build()
+ }
+
+ def deserializeState(tuple: Tuple): State = {
+ val serializedContent =
+
Option(tuple.getField[String](SerializedStateContentColumn)).getOrElse("{}")
+ val loopCounter = toLong(tuple.getField[Any](LoopCounterColumn))
+ loadsPayload(serializedContent) + (LoopCounterColumn -> loopCounter)
+ }
+
+ def dumpsPayload(payload: Map[String, Any]): String = {
+
objectMapper.writeValueAsString(payload.view.mapValues(normalizeForJson).toMap)
+ }
+
+ def loadsPayload(serializedPayload: String): Map[String, Any] = {
+ val root = objectMapper.readTree(Option(serializedPayload).getOrElse("{}"))
+ if (root == null || !root.isObject) {
+ Map.empty
+ } else {
+ root.fields().asScala.map(entry => entry.getKey ->
denormalizeFromJson(entry.getValue)).toMap
+ }
+ }
+
+ private def normalizeForJson(value: Any): Any = {
+ value match {
+ case null => null
+ case bytes: Array[Byte] =>
+ Map(BytesTypeMarker -> BytesValue, PayloadMarker ->
Base64.getEncoder.encodeToString(bytes))
+ case map: Map[_, _] =>
+ map.iterator.map { case (k, v) => k.toString -> normalizeForJson(v)
}.toMap
+ case iterable: Iterable[_] =>
+ iterable.map(normalizeForJson).toList
+ case other => other
+ }
+ }
+
+ private def denormalizeFromJson(node: JsonNode): Any = {
+ if (node == null || node.isNull) {
+ null
+ } else if (node.isObject) {
+ val fields = node.fields().asScala.map(entry => entry.getKey ->
entry.getValue).toMap
+ fields.get(BytesTypeMarker) match {
+ case Some(typeNode) if typeNode.isTextual && typeNode.asText() ==
BytesValue =>
+ Base64.getDecoder.decode(fields(PayloadMarker).asText())
+ case _ =>
+ fields.view.mapValues(denormalizeFromJson).toMap
+ }
+ } else if (node.isArray) {
+ node.elements().asScala.map(denormalizeFromJson).toList
+ } else if (node.isBoolean) {
+ node.asBoolean()
+ } else if (node.isIntegralNumber) {
+ node.longValue()
+ } else if (node.isFloatingPointNumber) {
+ node.doubleValue()
+ } else {
+ node.asText()
+ }
+ }
+
+ private def toLong(value: Any): Long = {
+ value match {
+ case null => 0L
+ case number: java.lang.Number => number.longValue()
+ case text: String => text.toLong
+ case other =>
+ throw new IllegalArgumentException(s"Cannot convert $other to loop
counter")
+ }
+ }
}