This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 9ada100c [hotfix] Fix the problems using Kafka as action state store
(#510)
9ada100c is described below
commit 9ada100cdb0d6e24b50aa7aaaa365c531cac9ce9
Author: Xuannan <[email protected]>
AuthorDate: Sun Feb 1 10:12:01 2026 +0800
[hotfix] Fix the problems using Kafka as action state store (#510)
- Fix ActionState deserialization failure: Removed @JsonIgnore from
PythonEvent.getEvent() so that Python event bytes are persisted in ActionState.
Without this, recovered ActionState had null event bytes, causing TypeError: a
bytes-like object is required, not 'NoneType' after recovery.
- Fix ActionStateStore not initialized during state recovery: Moved
ActionStateStore initialization to also run in initializeState(), ensuring it's
available when rebuilding state from recovery markers.
- Fix state key mismatch after recovery: Changed Python Event.id from
random UUID to deterministic content-based UUID (MD5 hash). This ensures the
same event produces the same ActionState key across restarts, enabling proper
state lookup and divergence detection.
- Fix Kafka consumer partition assignment: Use consumer.assign() instead of
subscribe() for explicit partition control during state rebuild.
---
python/flink_agents/api/events/event.py | 36 ++++++++++++---
.../runtime/actionstate/ActionStateUtil.java | 15 ++-----
.../runtime/actionstate/KafkaActionStateStore.java | 51 ++++++++++++++++++----
.../runtime/operator/ActionExecutionOperator.java | 24 +++++++---
.../agents/runtime/python/event/PythonEvent.java | 2 -
.../runtime/python/event/PythonEventTest.java | 4 +-
6 files changed, 95 insertions(+), 37 deletions(-)
diff --git a/python/flink_agents/api/events/event.py
b/python/flink_agents/api/events/event.py
index 44d32a25..b37c0ecd 100644
--- a/python/flink_agents/api/events/event.py
+++ b/python/flink_agents/api/events/event.py
@@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import hashlib
from abc import ABC
from typing import Any, Dict
@@ -22,7 +23,7 @@ try:
from typing import override
except ImportError:
from typing_extensions import override
-from uuid import UUID, uuid4
+from uuid import UUID
from pydantic import BaseModel, Field, model_validator
from pydantic_core import PydanticSerializationError
@@ -30,16 +31,20 @@ from pyflink.common import Row
class Event(BaseModel, ABC, extra="allow"):
- """Base class for all event types in the system. Event allow extra
properties, but
- these properties are required isinstance of BaseModel, or json
serializable.
+ """Base class for all event types in the system.
+
+ Event allows extra properties, but these must be BaseModel instances or
JSON
+ serializable.
Attributes:
----------
id : UUID
- Unique identifier for the event, automatically generated using uuid4.
+ Unique identifier for the event, generated deterministically based on
+ event content. This ensures events with identical content have the same
+ ID, which is critical for ActionStateStore divergence detection.
"""
- id: UUID = Field(default_factory=uuid4)
+ id: UUID = Field(default=None)
@staticmethod
def __serialize_unknown(field: Any) -> Dict[str, Any]:
@@ -58,9 +63,23 @@ class Event(BaseModel, ABC, extra="allow"):
kwargs["fallback"] = self.__serialize_unknown
return super().model_dump_json(**kwargs)
+ def _generate_content_based_id(self) -> UUID:
+ """Generate a deterministic UUID based on event content using MD5 hash.
+
+ Similar to Java's UUID.nameUUIDFromBytes(), uses MD5 for version 3
UUID.
+ """
+ # Serialize content excluding 'id' to avoid circular dependency
+ content_json = super().model_dump_json(
+ exclude={"id"}, fallback=self.__serialize_unknown
+ )
+ md5_hash = hashlib.md5(content_json.encode()).digest()
+ return UUID(bytes=md5_hash, version=3)
+
@model_validator(mode="after")
- def validate_extra(self) -> "Event":
- """Ensure init fields is serializable."""
+ def validate_and_set_id(self) -> "Event":
+ """Validate that fields are serializable and generate content-based
ID."""
+ if self.id is None:
+ object.__setattr__(self, "id", self._generate_content_based_id())
self.model_dump_json()
return self
@@ -68,6 +87,9 @@ class Event(BaseModel, ABC, extra="allow"):
super().__setattr__(name, value)
# Ensure added property can be serialized.
self.model_dump_json()
+ # Regenerate ID if content changed (but not if setting 'id' itself)
+ if name != "id":
+ object.__setattr__(self, "id", self._generate_content_based_id())
class InputEvent(Event):
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
index dac2d5d2..f6111636 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
@@ -59,22 +59,15 @@ public class ActionStateUtil {
}
private static String generateUUIDForEvent(Event event) throws IOException
{
- if (event instanceof InputEvent) {
+ if (event instanceof PythonEvent) {
+ PythonEvent pythonEvent = (PythonEvent) event;
+ return
String.valueOf(UUID.nameUUIDFromBytes(pythonEvent.getEvent()));
+ } else if (event instanceof InputEvent) {
InputEvent inputEvent = (InputEvent) event;
byte[] inputEventBytes =
MAPPER.writeValueAsBytes(
new Object[] {inputEvent.getInput(),
inputEvent.getAttributes()});
return String.valueOf(UUID.nameUUIDFromBytes(inputEventBytes));
- } else if (event instanceof PythonEvent) {
- PythonEvent pythonEvent = (PythonEvent) event;
- byte[] pythonEventBytes =
- MAPPER.writeValueAsBytes(
- new Object[] {
- pythonEvent.getEvent(),
- pythonEvent.getEventType(),
- pythonEvent.getAttributes()
- });
- return String.valueOf(UUID.nameUUIDFromBytes(pythonEventBytes));
} else {
return String.valueOf(
UUID.nameUUIDFromBytes(
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
index 7782f993..648f146b 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -121,7 +120,6 @@ public class KafkaActionStateStore implements
ActionStateStore {
this.producer = new KafkaProducer<>(producerProp);
Properties consumerProp = createConsumerProp();
this.consumer = new KafkaConsumer<>(consumerProp);
- consumer.subscribe(Collections.singletonList(topic));
LOG.info("Initialized KafkaActionStateStore with topic: {}", topic);
}
@@ -138,7 +136,12 @@ public class KafkaActionStateStore implements
ActionStateStore {
ProducerRecord<String, ActionState> kafkaRecord =
new ProducerRecord<>(topic, stateKey, state);
producer.send(kafkaRecord);
- LOG.debug("Sent action state to Kafka for key: {}", stateKey);
+ actionStates.put(stateKey, state);
+ producer.flush();
+ LOG.debug(
+ "Stored action state to Kafka: key={}, isCompleted={}",
+ stateKey,
+ state.isCompleted());
} catch (Exception e) {
throw new RuntimeException("Failed to send action state to Kafka",
e);
}
@@ -148,6 +151,13 @@ public class KafkaActionStateStore implements
ActionStateStore {
public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
String stateKey = generateKey(key, seqNum, action, event);
+ LOG.debug(
+ "Looking up action state: key={}, seqNum={}, stateKey={},
cachedStates={}",
+ key,
+ seqNum,
+ stateKey,
+ actionStates.keySet());
+
boolean hasDivergence = checkDivergence(key.toString(), seqNum);
if (!actionStates.containsKey(stateKey) || hasDivergence) {
@@ -173,7 +183,14 @@ public class KafkaActionStateStore implements
ActionStateStore {
});
}
- return actionStates.get(stateKey);
+ ActionState result = actionStates.get(stateKey);
+ if (result != null) {
+ LOG.debug("Found action state: key={}, isCompleted={}", stateKey,
result.isCompleted());
+ } else {
+ LOG.debug("Action state not found: key={}", stateKey);
+ }
+
+ return result;
}
private boolean checkDivergence(String key, long seqNum) {
@@ -184,6 +201,10 @@ public class KafkaActionStateStore implements
ActionStateStore {
@Override
public void rebuildState(List<Object> recoveryMarkers) {
LOG.info("Rebuilding state from {} recovery markers",
recoveryMarkers.size());
+ if (recoveryMarkers.isEmpty()) {
+ LOG.info("No recovery markers, skipping state rebuild");
+ return;
+ }
try {
Map<Integer, Long> partitionMap = new HashMap<>();
@@ -201,9 +222,23 @@ public class KafkaActionStateStore implements
ActionStateStore {
}
}
}
- partitionMap.forEach(
- (partition, offset) ->
- consumer.seek(new TopicPartition(topic,
partition), offset));
+
+ // Build list of TopicPartitions to assign
+ List<TopicPartition> partitionsToAssign = new ArrayList<>();
+ for (Integer partition : partitionMap.keySet()) {
+ partitionsToAssign.add(new TopicPartition(topic, partition));
+ }
+
+ // Assign partitions
+ consumer.assign(partitionsToAssign);
+
+ // Seek to marker offsets
+ if (!partitionMap.isEmpty()) {
+ // Seek to marker offsets
+ partitionMap.forEach(
+ (partition, offset) ->
+ consumer.seek(new TopicPartition(topic,
partition), offset));
+ }
// Poll for records and rebuild state until the latest offsets
while (true) {
@@ -236,7 +271,7 @@ public class KafkaActionStateStore implements
ActionStateStore {
@Override
public void pruneState(Object key, long seqNum) {
- LOG.info("Pruning state for key: {} up to sequence number: {}", key,
seqNum);
+ LOG.debug("Pruning state for key: {} up to sequence number: {}", key,
seqNum);
// Remove states from in-memory cache for this key up to the specified
sequence
// number
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 3bc2912b..f4e58efe 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -268,13 +268,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
keySegmentQueue = new SegmentedQueue();
- // init the action state store with proper implementation
- if (actionStateStore == null
- && KAFKA.getType()
-
.equalsIgnoreCase(agentPlan.getConfig().get(ACTION_STATE_STORE_BACKEND))) {
- LOG.info("Using Kafka as backend of action state store.");
- actionStateStore = new
KafkaActionStateStore(agentPlan.getConfig());
- }
+ maybeInitActionStateStore();
if (actionStateStore != null) {
// init recovery marker state for recovery marker persistence
@@ -475,6 +469,10 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
// Check if action is already completed
if (actionState != null && actionState.isCompleted()) {
// Action has completed, skip execution and replay memory/events
+ LOG.debug(
+ "Skipping already completed action: {} for key: {}",
+ actionTask.action.getName(),
+ key);
isFinished = true;
outputEvents = actionState.getOutputEvents();
for (MemoryUpdate memoryUpdate :
actionState.getShortTermMemoryUpdates()) {
@@ -739,6 +737,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
+ maybeInitActionStateStore();
+
if (actionStateStore != null) {
List<Object> markers = new ArrayList<>();
@@ -757,6 +757,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
if (recoveryMarkers != null) {
recoveryMarkers.forEach(markers::add);
}
+ LOG.info("Rebuilding action state from {} recovery markers",
markers.size());
actionStateStore.rebuildState(markers);
}
@@ -1112,6 +1113,15 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
}
+ private void maybeInitActionStateStore() {
+ if (actionStateStore == null
+ && KAFKA.getType()
+
.equalsIgnoreCase(agentPlan.getConfig().get(ACTION_STATE_STORE_BACKEND))) {
+ LOG.info("Using Kafka as backend of action state store.");
+ actionStateStore = new
KafkaActionStateStore(agentPlan.getConfig());
+ }
+ }
+
/** Failed to execute Action task. */
public static class ActionTaskExecutionException extends Exception {
public ActionTaskExecutionException(String message, Throwable cause) {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
index 2abe7cb6..e0f9a906 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
@@ -20,7 +20,6 @@
package org.apache.flink.agents.runtime.python.event;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.agents.api.Event;
@@ -61,7 +60,6 @@ public class PythonEvent extends Event {
this.eventJsonStr = eventJsonStr;
}
- @JsonIgnore // Don't serialize byte array in logs - used for processing
only
public byte[] getEvent() {
return event;
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
index 597f1014..142b2bf5 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
@@ -83,8 +83,8 @@ class PythonEventTest {
assertThat(jsonNode.has("eventType")).isTrue();
assertThat(jsonNode.has("eventJsonStr")).isTrue();
assertThat(jsonNode.has("attributes")).isTrue();
- // event bytes should not be serialized
- assertThat(jsonNode.has("event")).isFalse();
+ // event bytes should be serialized for ActionState persistence
+ assertThat(jsonNode.has("event")).isTrue();
assertThat(jsonNode.get("eventType").asText()).isEqualTo(eventType);
assertThat(jsonNode.get("eventJsonStr").asText()).isEqualTo(eventJsonStr);
assertThat(jsonNode.get("attributes").get("testKey").asText()).isEqualTo("testValue");