This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 9ada100c [hotfix] Fix the problems using Kafka as action state store 
(#510)
9ada100c is described below

commit 9ada100cdb0d6e24b50aa7aaaa365c531cac9ce9
Author: Xuannan <[email protected]>
AuthorDate: Sun Feb 1 10:12:01 2026 +0800

    [hotfix] Fix the problems using Kafka as action state store (#510)
    
    - Fix ActionState deserialization failure: Removed @JsonIgnore from 
PythonEvent.getEvent() so that Python event bytes are persisted in ActionState. 
Without this, recovered ActionState had null event bytes, causing TypeError: a 
bytes-like object is required, not 'NoneType' after recovery.
    - Fix ActionStateStore not initialized during state recovery: Moved 
ActionStateStore initialization to also run in initializeState(), ensuring it's 
available when rebuilding state from recovery markers.
    - Fix state key mismatch after recovery: Changed Python Event.id from 
random UUID to deterministic content-based UUID (MD5 hash). This ensures the 
same event produces the same ActionState key across restarts, enabling proper 
state lookup and divergence detection.
    - Fix Kafka consumer partition assignment: Use consumer.assign() instead of 
subscribe() for explicit partition control during state rebuild.
---
 python/flink_agents/api/events/event.py            | 36 ++++++++++++---
 .../runtime/actionstate/ActionStateUtil.java       | 15 ++-----
 .../runtime/actionstate/KafkaActionStateStore.java | 51 ++++++++++++++++++----
 .../runtime/operator/ActionExecutionOperator.java  | 24 +++++++---
 .../agents/runtime/python/event/PythonEvent.java   |  2 -
 .../runtime/python/event/PythonEventTest.java      |  4 +-
 6 files changed, 95 insertions(+), 37 deletions(-)

diff --git a/python/flink_agents/api/events/event.py 
b/python/flink_agents/api/events/event.py
index 44d32a25..b37c0ecd 100644
--- a/python/flink_agents/api/events/event.py
+++ b/python/flink_agents/api/events/event.py
@@ -15,6 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+import hashlib
 from abc import ABC
 from typing import Any, Dict
 
@@ -22,7 +23,7 @@ try:
     from typing import override
 except ImportError:
     from typing_extensions import override
-from uuid import UUID, uuid4
+from uuid import UUID
 
 from pydantic import BaseModel, Field, model_validator
 from pydantic_core import PydanticSerializationError
@@ -30,16 +31,20 @@ from pyflink.common import Row
 
 
 class Event(BaseModel, ABC, extra="allow"):
-    """Base class for all event types in the system. Event allow extra 
properties, but
-    these properties are required isinstance of BaseModel, or json 
serializable.
+    """Base class for all event types in the system.
+
+    Event allows extra properties, but these must be BaseModel instances or 
JSON
+    serializable.
 
     Attributes:
     ----------
     id : UUID
-        Unique identifier for the event, automatically generated using uuid4.
+        Unique identifier for the event, generated deterministically based on
+        event content. This ensures events with identical content have the same
+        ID, which is critical for ActionStateStore divergence detection.
     """
 
-    id: UUID = Field(default_factory=uuid4)
+    id: UUID = Field(default=None)
 
     @staticmethod
     def __serialize_unknown(field: Any) -> Dict[str, Any]:
@@ -58,9 +63,23 @@ class Event(BaseModel, ABC, extra="allow"):
             kwargs["fallback"] = self.__serialize_unknown
         return super().model_dump_json(**kwargs)
 
+    def _generate_content_based_id(self) -> UUID:
+        """Generate a deterministic UUID based on event content using MD5 hash.
+
+        Similar to Java's UUID.nameUUIDFromBytes(), uses MD5 for version 3 
UUID.
+        """
+        # Serialize content excluding 'id' to avoid circular dependency
+        content_json = super().model_dump_json(
+            exclude={"id"}, fallback=self.__serialize_unknown
+        )
+        md5_hash = hashlib.md5(content_json.encode()).digest()
+        return UUID(bytes=md5_hash, version=3)
+
     @model_validator(mode="after")
-    def validate_extra(self) -> "Event":
-        """Ensure init fields is serializable."""
+    def validate_and_set_id(self) -> "Event":
+        """Validate that fields are serializable and generate content-based 
ID."""
+        if self.id is None:
+            object.__setattr__(self, "id", self._generate_content_based_id())
         self.model_dump_json()
         return self
 
@@ -68,6 +87,9 @@ class Event(BaseModel, ABC, extra="allow"):
         super().__setattr__(name, value)
         # Ensure added property can be serialized.
         self.model_dump_json()
+        # Regenerate ID if content changed (but not if setting 'id' itself)
+        if name != "id":
+            object.__setattr__(self, "id", self._generate_content_based_id())
 
 
 class InputEvent(Event):
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
index dac2d5d2..f6111636 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
@@ -59,22 +59,15 @@ public class ActionStateUtil {
     }
 
     private static String generateUUIDForEvent(Event event) throws IOException 
{
-        if (event instanceof InputEvent) {
+        if (event instanceof PythonEvent) {
+            PythonEvent pythonEvent = (PythonEvent) event;
+            return 
String.valueOf(UUID.nameUUIDFromBytes(pythonEvent.getEvent()));
+        } else if (event instanceof InputEvent) {
             InputEvent inputEvent = (InputEvent) event;
             byte[] inputEventBytes =
                     MAPPER.writeValueAsBytes(
                             new Object[] {inputEvent.getInput(), 
inputEvent.getAttributes()});
             return String.valueOf(UUID.nameUUIDFromBytes(inputEventBytes));
-        } else if (event instanceof PythonEvent) {
-            PythonEvent pythonEvent = (PythonEvent) event;
-            byte[] pythonEventBytes =
-                    MAPPER.writeValueAsBytes(
-                            new Object[] {
-                                pythonEvent.getEvent(),
-                                pythonEvent.getEventType(),
-                                pythonEvent.getAttributes()
-                            });
-            return String.valueOf(UUID.nameUUIDFromBytes(pythonEventBytes));
         } else {
             return String.valueOf(
                     UUID.nameUUIDFromBytes(
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
index 7782f993..648f146b 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
@@ -43,7 +43,6 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -121,7 +120,6 @@ public class KafkaActionStateStore implements 
ActionStateStore {
         this.producer = new KafkaProducer<>(producerProp);
         Properties consumerProp = createConsumerProp();
         this.consumer = new KafkaConsumer<>(consumerProp);
-        consumer.subscribe(Collections.singletonList(topic));
         LOG.info("Initialized KafkaActionStateStore with topic: {}", topic);
     }
 
@@ -138,7 +136,12 @@ public class KafkaActionStateStore implements 
ActionStateStore {
             ProducerRecord<String, ActionState> kafkaRecord =
                     new ProducerRecord<>(topic, stateKey, state);
             producer.send(kafkaRecord);
-            LOG.debug("Sent action state to Kafka for key: {}", stateKey);
+            actionStates.put(stateKey, state);
+            producer.flush();
+            LOG.debug(
+                    "Stored action state to Kafka: key={}, isCompleted={}",
+                    stateKey,
+                    state.isCompleted());
         } catch (Exception e) {
             throw new RuntimeException("Failed to send action state to Kafka", 
e);
         }
@@ -148,6 +151,13 @@ public class KafkaActionStateStore implements 
ActionStateStore {
     public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
         String stateKey = generateKey(key, seqNum, action, event);
 
+        LOG.debug(
+                "Looking up action state: key={}, seqNum={}, stateKey={}, 
cachedStates={}",
+                key,
+                seqNum,
+                stateKey,
+                actionStates.keySet());
+
         boolean hasDivergence = checkDivergence(key.toString(), seqNum);
 
         if (!actionStates.containsKey(stateKey) || hasDivergence) {
@@ -173,7 +183,14 @@ public class KafkaActionStateStore implements 
ActionStateStore {
                             });
         }
 
-        return actionStates.get(stateKey);
+        ActionState result = actionStates.get(stateKey);
+        if (result != null) {
+            LOG.debug("Found action state: key={}, isCompleted={}", stateKey, 
result.isCompleted());
+        } else {
+            LOG.debug("Action state not found: key={}", stateKey);
+        }
+
+        return result;
     }
 
     private boolean checkDivergence(String key, long seqNum) {
@@ -184,6 +201,10 @@ public class KafkaActionStateStore implements 
ActionStateStore {
     @Override
     public void rebuildState(List<Object> recoveryMarkers) {
         LOG.info("Rebuilding state from {} recovery markers", 
recoveryMarkers.size());
+        if (recoveryMarkers.isEmpty()) {
+            LOG.info("No recovery markers, skipping state rebuild");
+            return;
+        }
 
         try {
             Map<Integer, Long> partitionMap = new HashMap<>();
@@ -201,9 +222,23 @@ public class KafkaActionStateStore implements 
ActionStateStore {
                     }
                 }
             }
-            partitionMap.forEach(
-                    (partition, offset) ->
-                            consumer.seek(new TopicPartition(topic, 
partition), offset));
+
+            // Build list of TopicPartitions to assign
+            List<TopicPartition> partitionsToAssign = new ArrayList<>();
+            for (Integer partition : partitionMap.keySet()) {
+                partitionsToAssign.add(new TopicPartition(topic, partition));
+            }
+
+            // Assign partitions
+            consumer.assign(partitionsToAssign);
+
+            // Seek to marker offsets
+            if (!partitionMap.isEmpty()) {
+                // Seek to marker offsets
+                partitionMap.forEach(
+                        (partition, offset) ->
+                                consumer.seek(new TopicPartition(topic, 
partition), offset));
+            }
 
             // Poll for records and rebuild state until the latest offsets
             while (true) {
@@ -236,7 +271,7 @@ public class KafkaActionStateStore implements 
ActionStateStore {
 
     @Override
     public void pruneState(Object key, long seqNum) {
-        LOG.info("Pruning state for key: {} up to sequence number: {}", key, 
seqNum);
+        LOG.debug("Pruning state for key: {} up to sequence number: {}", key, 
seqNum);
 
         // Remove states from in-memory cache for this key up to the specified 
sequence
         // number
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 3bc2912b..f4e58efe 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -268,13 +268,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
         keySegmentQueue = new SegmentedQueue();
 
-        // init the action state store with proper implementation
-        if (actionStateStore == null
-                && KAFKA.getType()
-                        
.equalsIgnoreCase(agentPlan.getConfig().get(ACTION_STATE_STORE_BACKEND))) {
-            LOG.info("Using Kafka as backend of action state store.");
-            actionStateStore = new 
KafkaActionStateStore(agentPlan.getConfig());
-        }
+        maybeInitActionStateStore();
 
         if (actionStateStore != null) {
             // init recovery marker state for recovery marker persistence
@@ -475,6 +469,10 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         // Check if action is already completed
         if (actionState != null && actionState.isCompleted()) {
             // Action has completed, skip execution and replay memory/events
+            LOG.debug(
+                    "Skipping already completed action: {} for key: {}",
+                    actionTask.action.getName(),
+                    key);
             isFinished = true;
             outputEvents = actionState.getOutputEvents();
             for (MemoryUpdate memoryUpdate : 
actionState.getShortTermMemoryUpdates()) {
@@ -739,6 +737,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
 
+        maybeInitActionStateStore();
+
         if (actionStateStore != null) {
             List<Object> markers = new ArrayList<>();
 
@@ -757,6 +757,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             if (recoveryMarkers != null) {
                 recoveryMarkers.forEach(markers::add);
             }
+            LOG.info("Rebuilding action state from {} recovery markers", 
markers.size());
             actionStateStore.rebuildState(markers);
         }
 
@@ -1112,6 +1113,15 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
     }
 
+    private void maybeInitActionStateStore() {
+        if (actionStateStore == null
+                && KAFKA.getType()
+                        
.equalsIgnoreCase(agentPlan.getConfig().get(ACTION_STATE_STORE_BACKEND))) {
+            LOG.info("Using Kafka as backend of action state store.");
+            actionStateStore = new 
KafkaActionStateStore(agentPlan.getConfig());
+        }
+    }
+
     /** Failed to execute Action task. */
     public static class ActionTaskExecutionException extends Exception {
         public ActionTaskExecutionException(String message, Throwable cause) {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
index 2abe7cb6..e0f9a906 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
@@ -20,7 +20,6 @@
 package org.apache.flink.agents.runtime.python.event;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.agents.api.Event;
 
@@ -61,7 +60,6 @@ public class PythonEvent extends Event {
         this.eventJsonStr = eventJsonStr;
     }
 
-    @JsonIgnore // Don't serialize byte array in logs - used for processing 
only
     public byte[] getEvent() {
         return event;
     }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
index 597f1014..142b2bf5 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/python/event/PythonEventTest.java
@@ -83,8 +83,8 @@ class PythonEventTest {
         assertThat(jsonNode.has("eventType")).isTrue();
         assertThat(jsonNode.has("eventJsonStr")).isTrue();
         assertThat(jsonNode.has("attributes")).isTrue();
-        // event bytes should not be serialized
-        assertThat(jsonNode.has("event")).isFalse();
+        // event bytes should be serialized for ActionState persistence
+        assertThat(jsonNode.has("event")).isTrue();
         assertThat(jsonNode.get("eventType").asText()).isEqualTo(eventType);
         
assertThat(jsonNode.get("eventJsonStr").asText()).isEqualTo(eventJsonStr);
         
assertThat(jsonNode.get("attributes").get("testKey").asText()).isEqualTo("testValue");

Reply via email to