This is an automated email from the ASF dual-hosted git repository.

Sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 171465f7 [feature][runtime] Support Fluss as ActionStateStore backend 
(#628)
171465f7 is described below

commit 171465f7f1b5f7841970fb9b657b84ceafead88e
Author: Junbo Wang <[email protected]>
AuthorDate: Thu May 14 09:26:08 2026 +0800

    [feature][runtime] Support Fluss as ActionStateStore backend (#628)
---
 .../api/configuration/AgentConfigOptions.java      |  42 ++
 docs/content/docs/operations/configuration.md      |  23 +-
 docs/content/docs/operations/deployment.md         |   2 +-
 docs/themes/book                                   |   2 +-
 pom.xml                                            |   2 +
 runtime/pom.xml                                    |  38 ++
 .../actionstate/ActionStateKafkaDeserializer.java  | 110 -----
 .../runtime/actionstate/ActionStateKafkaSeder.java |  85 +---
 ...eKafkaSerializer.java => ActionStateSerde.java} |  51 +-
 .../runtime/actionstate/ActionStateStore.java      |   3 +-
 .../runtime/actionstate/FlussActionStateStore.java | 542 +++++++++++++++++++++
 .../runtime/actionstate/KafkaActionStateStore.java |   5 +-
 .../runtime/operator/DurableExecutionManager.java  |  15 +-
 .../runtime/actionstate/ActionStateSerdeTest.java  |  77 +--
 .../runtime/actionstate/ActionStateUtilTest.java   |  36 +-
 .../actionstate/FlussActionStateStoreIT.java       | 330 +++++++++++++
 .../actionstate/FlussActionStateStoreSaslIT.java   | 168 +++++++
 .../actionstate/FlussActionStateStoreTest.java     | 173 +++++++
 .../actionstate/KafkaActionStateStoreTest.java     |  25 +-
 .../agents/runtime/actionstate/NoOpAction.java     |  44 ++
 20 files changed, 1458 insertions(+), 315 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 100894f7..608b7f78 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -62,6 +62,48 @@ public class AgentConfigOptions {
     public static final ConfigOption<Integer> 
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
             new ConfigOption<>("kafkaActionStateTopicReplicationFactor", 
Integer.class, 1);
 
+    /** The config parameter specifies the Fluss bootstrap servers. */
+    public static final ConfigOption<String> FLUSS_BOOTSTRAP_SERVERS =
+            new ConfigOption<>("flussBootstrapServers", String.class, 
"localhost:9123");
+
+    /** The config parameter specifies the Fluss database for action state. */
+    public static final ConfigOption<String> FLUSS_ACTION_STATE_DATABASE =
+            new ConfigOption<>("flussActionStateDatabase", String.class, 
"flink_agents");
+
+    /** The config parameter specifies the Fluss table name for action state. 
*/
+    public static final ConfigOption<String> FLUSS_ACTION_STATE_TABLE =
+            new ConfigOption<>("flussActionStateTable", String.class, null);
+
+    /** The config parameter specifies the number of buckets for the Fluss 
action state table. */
+    public static final ConfigOption<Integer> FLUSS_ACTION_STATE_TABLE_BUCKETS 
=
+            new ConfigOption<>("flussActionStateTableBuckets", Integer.class, 
64);
+
+    /**
+     * The config parameter specifies the authentication protocol for Fluss 
client. Valid values:
+     * {@code "PLAINTEXT"} (default, no authentication) and {@code "SASL"} 
(SASL/PLAIN
+     * authentication). Value is case-insensitive.
+     */
+    public static final ConfigOption<String> FLUSS_SECURITY_PROTOCOL =
+            new ConfigOption<>("flussSecurityProtocol", String.class, 
"PLAINTEXT");
+
+    /** The config parameter specifies the SASL mechanism for Fluss 
authentication. */
+    public static final ConfigOption<String> FLUSS_SASL_MECHANISM =
+            new ConfigOption<>("flussSaslMechanism", String.class, "PLAIN");
+
+    /**
+     * The config parameter specifies the JAAS configuration string for Fluss 
SASL authentication.
+     */
+    public static final ConfigOption<String> FLUSS_SASL_JAAS_CONFIG =
+            new ConfigOption<>("flussSaslJaasConfig", String.class, null);
+
+    /** The config parameter specifies the username for Fluss SASL 
authentication. */
+    public static final ConfigOption<String> FLUSS_SASL_USERNAME =
+            new ConfigOption<>("flussSaslUsername", String.class, null);
+
+    /** The config parameter specifies the password for Fluss SASL 
authentication. */
+    public static final ConfigOption<String> FLUSS_SASL_PASSWORD =
+            new ConfigOption<>("flussSaslPassword", String.class, null);
+
     /** The config parameter specifies the unique identifier of job. */
     public static final ConfigOption<String> JOB_IDENTIFIER =
             new ConfigOption<>("job-identifier", String.class, null);
diff --git a/docs/content/docs/operations/configuration.md 
b/docs/content/docs/operations/configuration.md
index 8a387def..82e08f7a 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -146,14 +146,35 @@ Here is the list of all built-in core configuration 
options.
 
 ### Action State Store
 
+#### Common
+
+| Key                          | Default          | Type    | Description      
                                                                        |
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `actionStateStoreBackend`    | (none)           | String  | The backend for 
action state store. Supported values: `"kafka"`, `"fluss"`.              |
+
 #### Kafka-based Action State Store
 
 Here are the configuration options for Kafka-based Action State Store.
 
 | Key                                 | Default                  | Type    | 
Description                                                                 |
 
|-------------------------------------|--------------------------|---------|-----------------------------------------------------------------------------|
-| `actionStateStoreBackend`           | (none)                   | String  | 
The config parameter specifies the backend for action state store.          |
 | `kafkaBootstrapServers`             | "localhost:9092"         | String  | 
The config parameter specifies the Kafka bootstrap server.                  |
 | `kafkaActionStateTopic`             | (none)                   | String  | 
The config parameter specifies the Kafka topic for action state.            |
 | `kafkaActionStateTopicNumPartitions`| 64                       | Integer | 
The config parameter specifies the number of partitions for the Kafka action 
state topic. |
 | `kafkaActionStateTopicReplicationFactor` | 1                     | Integer | 
The config parameter specifies the replication factor for the Kafka action 
state topic. |
+
+#### Fluss-based Action State Store
+
+Here are the configuration options for Fluss-based Action State Store.
+
+| Key                          | Default          | Type    | Description      
                                                                        |
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `flussBootstrapServers`      | "localhost:9123" | String  | The Fluss 
bootstrap servers address.                                                     |
+| `flussActionStateDatabase`   | "flink_agents"   | String  | The Fluss 
database name for storing action state.                                        |
+| `flussActionStateTable`      | "action_states"  | String  | The Fluss table 
name for storing action state.                                           |
+| `flussActionStateTableBuckets` | 64             | Integer | The number of 
buckets for the Fluss action state table.                                  |
+| `flussSecurityProtocol`      | "PLAINTEXT"      | String  | The 
authentication protocol for Fluss client. Valid values: `PLAINTEXT` (default, 
no authentication), `SASL` (SASL/PLAIN authentication). |
+| `flussSaslMechanism`         | "PLAIN"          | String  | The SASL 
mechanism for Fluss authentication.                                             
|
+| `flussSaslJaasConfig`        | (none)           | String  | The JAAS 
configuration string for Fluss SASL authentication.                             
|
+| `flussSaslUsername`          | (none)           | String  | The username for 
Fluss SASL authentication.                                              |
+| `flussSaslPassword`          | (none)           | String  | The password for 
Fluss SASL authentication.                                              |
diff --git a/docs/content/docs/operations/deployment.md 
b/docs/content/docs/operations/deployment.md
index 1758c416..bb3cfb30 100644
--- a/docs/content/docs/operations/deployment.md
+++ b/docs/content/docs/operations/deployment.md
@@ -174,7 +174,7 @@ To ensure exactly-once action consistency, you must 
configure an external action
 The same persisted action state is also used by fine-grained durable execution.
 
 {{< hint info >}}
-**Note**: Currently, Kafka is supported as the external action state store.
+**Note**: Currently, Kafka and Fluss are supported as the external action 
state store.
 {{< /hint >}}
 
 See [Action State Store Configuration]({{< ref 
"docs/operations/configuration#action-state-store" >}}) for configuration 
options.
diff --git a/docs/themes/book b/docs/themes/book
index a486adf8..4018d4b5 160000
--- a/docs/themes/book
+++ b/docs/themes/book
@@ -1 +1 @@
-Subproject commit a486adf8462c0abfc9034436ddd72927d6656809
+Subproject commit 4018d4b51d10427bc7f80da0b0d535a4b4787ba0
diff --git a/pom.xml b/pom.xml
index cd774479..e77b0a51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@ under the License.
         <spotless.skip>false</spotless.skip>
         <flink.version>2.2.0</flink.version>
         <kafka.version>4.0.0</kafka.version>
+        <fluss.version>0.9.0-incubating</fluss.version>
         <junit5.version>5.10.1</junit5.version>
         <jackson.version>2.18.2</jackson.version>
         <pemja.version>0.5.5</pemja.version>
@@ -309,6 +310,7 @@ under the License.
                 <configuration>
                     <argLine>
                         --add-opens=java.base/java.util=ALL-UNNAMED
+                        --add-opens=java.base/java.nio=ALL-UNNAMED
                         --add-exports=java.base/jdk.internal.vm=ALL-UNNAMED
                     </argLine>
                 </configuration>
diff --git a/runtime/pom.xml b/runtime/pom.xml
index 168fb2be..422c5c32 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -122,6 +122,44 @@ under the License.
             <artifactId>kafka-clients</artifactId>
             <version>${kafka.version}</version>
         </dependency>
+        <!-- fluss client -->
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-client</artifactId>
+            <version>${fluss.version}</version>
+        </dependency>
+        <!-- fluss test infrastructure -->
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-server</artifactId>
+            <version>${fluss.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-server</artifactId>
+            <version>${fluss.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-test-utils</artifactId>
+            <version>${fluss.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>5.4.0</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.junit.jupiter</groupId>
+                    <artifactId>junit-jupiter-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <!-- LOG -->
         <dependency>
             <groupId>org.slf4j</groupId>
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java
deleted file mode 100644
index 38e7b6ad..00000000
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.agents.runtime.actionstate;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.flink.agents.api.Event;
-import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.OutputEvent;
-import org.apache.flink.agents.runtime.operator.ActionTask;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Kafka deserializer for {@link ActionState}.
- *
- * <p>This deserializer handles the deserialization of byte arrays from Kafka 
back to ActionState
- * instances. It uses Jackson ObjectMapper with custom deserializers to handle 
polymorphic Event
- * types and ensures ActionTask is deserialized as null.
- */
-public class ActionStateKafkaDeserializer implements Deserializer<ActionState> 
{
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ActionStateKafkaDeserializer.class);
-    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // No configuration needed
-    }
-
-    @Override
-    public ActionState deserialize(String topic, byte[] data) {
-        if (data == null) {
-            return null;
-        }
-
-        try {
-            return OBJECT_MAPPER.readValue(data, ActionState.class);
-        } catch (Exception e) {
-            LOG.error("Failed to deserialize ActionState for topic: {}", 
topic, e);
-            throw new RuntimeException("Failed to deserialize ActionState", e);
-        }
-    }
-
-    @Override
-    public void close() {
-        // No resources to close
-    }
-
-    /** Creates and configures the ObjectMapper for ActionState 
deserialization. */
-    private static ObjectMapper createObjectMapper() {
-        ObjectMapper mapper = new ObjectMapper();
-
-        // Add type information for polymorphic Event deserialization
-        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
-        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
-        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
-
-        // Create a module for custom deserializers
-        SimpleModule module = new SimpleModule();
-
-        // Custom deserializer for ActionTask - always deserialize as null
-        module.addDeserializer(ActionTask.class, new ActionTaskDeserializer());
-
-        mapper.registerModule(module);
-
-        return mapper;
-    }
-
-    /** Mixin to add type information for Event hierarchy. */
-    @JsonTypeInfo(
-            use = JsonTypeInfo.Id.CLASS,
-            include = JsonTypeInfo.As.PROPERTY,
-            property = "@class")
-    public abstract static class EventTypeInfoMixin {}
-
-    /** Custom deserializer for ActionTask that always deserializes as null. */
-    public static class ActionTaskDeserializer extends 
JsonDeserializer<ActionTask> {
-        @Override
-        public ActionTask deserialize(JsonParser p, DeserializationContext 
ctxt)
-                throws IOException {
-            // Skip the value and return null
-            p.skipChildren();
-            return null;
-        }
-    }
-}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
index b225eb09..9af3dc3f 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
@@ -17,107 +17,34 @@
  */
 package org.apache.flink.agents.runtime.actionstate;
 
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.flink.agents.api.Event;
-import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.OutputEvent;
-import org.apache.flink.agents.runtime.operator.ActionTask;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Map;
 
 /**
- * Kafka serializer for {@link ActionState}.
- *
- * <p>This serializer handles the serialization of ActionState instances to 
byte arrays for storage
- * in Kafka. It uses Jackson ObjectMapper with custom serializers to handle 
polymorphic Event types
- * and ensures ActionTask is serialized as null.
+ * Kafka serializer/deserializer for {@link ActionState}. Delegates to {@link 
ActionStateSerde} for
+ * the actual serialization logic.
  */
 public class ActionStateKafkaSeder implements Serializer<ActionState>, 
Deserializer<ActionState> {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ActionStateKafkaSeder.class);
-    private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
-
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
         // No configuration needed
     }
 
     @Override
-    public ActionState deserialize(String topic, byte[] data) {
-        if (data == null) {
-            return null;
-        }
-
-        try {
-            return OBJECT_MAPPER.readValue(data, ActionState.class);
-        } catch (Exception e) {
-            LOG.error("Failed to deserialize ActionState for topic: {}", 
topic, e);
-            throw new RuntimeException("Failed to deserialize ActionState", e);
-        }
+    public byte[] serialize(String topic, ActionState data) {
+        return data == null ? null : ActionStateSerde.serialize(data);
     }
 
     @Override
-    public byte[] serialize(String topic, ActionState data) {
-        if (data == null) {
-            return null;
-        }
-
-        try {
-            return OBJECT_MAPPER.writeValueAsBytes(data);
-        } catch (Exception e) {
-            LOG.error("Failed to serialize ActionState for topic: {}", topic, 
e);
-            throw new RuntimeException("Failed to serialize ActionState", e);
-        }
+    public ActionState deserialize(String topic, byte[] data) {
+        return data == null ? null : ActionStateSerde.deserialize(data);
     }
 
     @Override
     public void close() {
         // No resources to close
     }
-
-    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
-    private static ObjectMapper createObjectMapper() {
-        ObjectMapper mapper = new ObjectMapper();
-
-        // Add type information for polymorphic Event deserialization
-        mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
-        mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
-        mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
-
-        // Create a module for custom serializers
-        SimpleModule module = new SimpleModule();
-
-        // Custom serializer for ActionTask - always serialize as null
-        module.addSerializer(ActionTask.class, new ActionTaskSerializer());
-
-        mapper.registerModule(module);
-
-        return mapper;
-    }
-
-    /** Mixin to add type information for Event hierarchy. */
-    @JsonTypeInfo(
-            use = JsonTypeInfo.Id.CLASS,
-            include = JsonTypeInfo.As.PROPERTY,
-            property = "@class")
-    public abstract static class EventTypeInfoMixin {}
-
-    /** Custom serializer for ActionTask that always serializes as null. */
-    public static class ActionTaskSerializer extends 
JsonSerializer<ActionTask> {
-        @Override
-        public void serialize(ActionTask value, JsonGenerator gen, 
SerializerProvider serializers)
-                throws IOException {
-            gen.writeNull();
-        }
-    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java
similarity index 67%
rename from 
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java
rename to 
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java
index 881c5293..02c9a06b 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java
@@ -27,50 +27,40 @@ import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.api.OutputEvent;
 import org.apache.flink.agents.runtime.operator.ActionTask;
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Map;
 
 /**
- * Kafka serializer for {@link ActionState}.
+ * Backend-agnostic serializer/deserializer for {@link ActionState}.
  *
- * <p>This serializer handles the serialization of ActionState instances to 
byte arrays for storage
- * in Kafka. It uses Jackson ObjectMapper with custom serializers to handle 
polymorphic Event types
- * and ensures ActionTask is serialized as null.
+ * <p>Uses Jackson {@link ObjectMapper} configured with polymorphic type 
information for the {@link
+ * Event} hierarchy and a custom null-serializer for {@link ActionTask}. Both 
Kafka and Fluss
+ * ActionStateStore backends delegate to this class for consistent 
serialization format.
  */
-public class ActionStateKafkaSerializer implements Serializer<ActionState> {
+public final class ActionStateSerde {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ActionStateKafkaSerializer.class);
     private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
 
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        // No configuration needed
-    }
-
-    @Override
-    public byte[] serialize(String topic, ActionState data) {
-        if (data == null) {
-            return null;
-        }
+    private ActionStateSerde() {}
 
+    /** Serializes an {@link ActionState} to a JSON byte array. */
+    public static byte[] serialize(ActionState state) {
         try {
-            return OBJECT_MAPPER.writeValueAsBytes(data);
+            return OBJECT_MAPPER.writeValueAsBytes(state);
         } catch (Exception e) {
-            LOG.error("Failed to serialize ActionState for topic: {}", topic, 
e);
             throw new RuntimeException("Failed to serialize ActionState", e);
         }
     }
 
-    @Override
-    public void close() {
-        // No resources to close
+    /** Deserializes an {@link ActionState} from a JSON byte array. */
+    public static ActionState deserialize(byte[] data) {
+        try {
+            return OBJECT_MAPPER.readValue(data, ActionState.class);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to deserialize ActionState", e);
+        }
     }
 
-    /** Creates and configures the ObjectMapper for ActionState serialization. 
*/
     private static ObjectMapper createObjectMapper() {
         ObjectMapper mapper = new ObjectMapper();
 
@@ -79,12 +69,9 @@ public class ActionStateKafkaSerializer implements 
Serializer<ActionState> {
         mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
         mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
 
-        // Create a module for custom serializers
-        SimpleModule module = new SimpleModule();
-
         // Custom serializer for ActionTask - always serialize as null
+        SimpleModule module = new SimpleModule();
         module.addSerializer(ActionTask.class, new ActionTaskSerializer());
-
         mapper.registerModule(module);
 
         return mapper;
@@ -95,10 +82,10 @@ public class ActionStateKafkaSerializer implements 
Serializer<ActionState> {
             use = JsonTypeInfo.Id.CLASS,
             include = JsonTypeInfo.As.PROPERTY,
             property = "@class")
-    public abstract static class EventTypeInfoMixin {}
+    abstract static class EventTypeInfoMixin {}
 
     /** Custom serializer for ActionTask that always serializes as null. */
-    public static class ActionTaskSerializer extends 
JsonSerializer<ActionTask> {
+    static class ActionTaskSerializer extends JsonSerializer<ActionTask> {
         @Override
         public void serialize(ActionTask value, JsonGenerator gen, 
SerializerProvider serializers)
                 throws IOException {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
index 433e2bea..e29557c0 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
@@ -26,7 +26,8 @@ import java.util.List;
 /** Interface for storing and retrieving the state of actions performed by 
agents. */
 public interface ActionStateStore extends AutoCloseable {
     enum BackendType {
-        KAFKA("kafka");
+        KAFKA("kafka"),
+        FLUSS("fluss");
 
         private final String type;
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java
new file mode 100644
index 00000000..b6993281
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.LongPredicate;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_MECHANISM;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SECURITY_PROTOCOL;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log 
table as the backend.
+ * All state is maintained in an in-memory map for fast lookups, with the 
Fluss log table providing
+ * durability and recovery support.
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlussActionStateStore.class);
+
+    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+    private static final Duration REBUILD_TIMEOUT = Duration.ofMinutes(5);
+
+    private static final String SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT";
+
+    // Column names in the Fluss table schema
+    private static final String COL_NAME_STATE_KEY = "state_key";
+    private static final String COL_NAME_STATE_PAYLOAD = "state_payload";
+    private static final String COL_NAME_AGENT_KEY = "agent_key";
+
+    // Column indices in the Fluss table schema
+    private static final int COL_STATE_KEY = 0;
+    private static final int COL_STATE_PAYLOAD = 1;
+
+    private final AgentConfiguration agentConfiguration;
+    private final String databaseName;
+    private final String tableName;
+    private final TablePath tablePath;
+
+    private final Connection connection;
+    private final Table table;
+    private final AppendWriter writer;
+
+    /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on 
recovery. */
+    private final Map<String, ActionState> actionStates;
+
+    @VisibleForTesting
+    FlussActionStateStore(
+            Map<String, ActionState> actionStates,
+            Connection connection,
+            Table table,
+            AppendWriter writer) {
+        this.agentConfiguration = null;
+        this.databaseName = null;
+        this.tableName = null;
+        this.tablePath = null;
+        this.actionStates = actionStates;
+        this.connection = connection;
+        this.table = table;
+        this.writer = writer;
+    }
+
+    public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+        this.agentConfiguration = agentConfiguration;
+        this.databaseName = 
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+        this.tableName =
+                Preconditions.checkNotNull(
+                        agentConfiguration.get(FLUSS_ACTION_STATE_TABLE),
+                        "flussActionStateTable must be explicitly configured");
+        this.tablePath = TablePath.of(databaseName, tableName);
+        this.actionStates = new HashMap<>();
+
+        Configuration flussConf = new Configuration();
+        flussConf.setString(
+                BOOTSTRAP_SERVERS.key(), 
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+        // Minimize latency for synchronous put(): setting batch linger time 
to zero ensures
+        // that each append is sent immediately without waiting for additional 
records to batch.
+        flussConf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, 
Duration.ZERO);
+
+        // Only set security/SASL parameters when the protocol requires 
authentication.
+        // When PLAINTEXT (the default), SASL parameters are semantically 
invalid and may
+        // cause the Fluss client to attempt an unwanted SASL handshake.
+        String securityProtocol = 
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+        flussConf.setString(CLIENT_SECURITY_PROTOCOL, securityProtocol);
+        if (!SECURITY_PROTOCOL_PLAINTEXT.equalsIgnoreCase(securityProtocol)) {
+            flussConf.setString(
+                    CLIENT_SASL_MECHANISM, 
agentConfiguration.get(FLUSS_SASL_MECHANISM));
+
+            String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+            if (jaasConfig != null) {
+                flussConf.setString(CLIENT_SASL_JAAS_CONFIG, jaasConfig);
+            }
+            String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+            if (username != null) {
+                flussConf.setString(CLIENT_SASL_JAAS_USERNAME, username);
+            }
+            String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+            if (password != null) {
+                flussConf.setString(CLIENT_SASL_JAAS_PASSWORD, password);
+            }
+        }
+
+        this.connection = ConnectionFactory.createConnection(flussConf);
+        Table tbl = null;
+        try {
+            maybeCreateDatabaseAndTable();
+            tbl = connection.getTable(tablePath);
+            this.table = tbl;
+            this.writer = table.newAppend().createWriter();
+        } catch (Exception e) {
+            if (tbl != null) {
+                try {
+                    tbl.close();
+                } catch (Exception suppressed) {
+                    e.addSuppressed(suppressed);
+                }
+            }
+            try {
+                connection.close();
+            } catch (Exception suppressed) {
+                e.addSuppressed(suppressed);
+            }
+            throw new RuntimeException("Failed to initialize 
FlussActionStateStore", e);
+        }
+
+        LOG.info(
+                "Initialized FlussActionStateStore (log table) with table: 
{}.{}",
+                databaseName,
+                tableName);
+    }
+
+    @Override
+    public void put(Object key, long seqNum, Action action, Event event, 
ActionState state)
+            throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        byte[] payload = ActionStateSerde.serialize(state);
+
+        GenericRow row =
+                GenericRow.of(
+                        BinaryString.fromString(stateKey),
+                        payload,
+                        BinaryString.fromString(key.toString()));
+
+        // Synchronous write ensures the record is durable before returning.
+        // TODO: Optimize throughput via batching + flush() once Fluss 
supports it
+        //  (see
+        // 
https://github.com/apache/fluss/blob/5850c837/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java#L234-L241).
+        //  Note: steps affecting recovery correctness must remain synchronous.
+        writer.append(row).get();
+        actionStates.put(stateKey, state);
+
+        LOG.debug("Stored action state: key={}, isCompleted={}", stateKey, 
state.isCompleted());
+    }
+
+    @Override
+    public ActionState get(Object key, long seqNum, Action action, Event 
event) throws Exception {
+        String stateKey = generateKey(key, seqNum, action, event);
+        String keyPrefix = key.toString() + "_";
+
+        boolean hasDivergence = checkDivergence(key.toString(), seqNum);
+
+        if (!actionStates.containsKey(stateKey) || hasDivergence) {
+            removeStateEntries(keyPrefix, stateSeqNum -> stateSeqNum > seqNum);
+        }
+
+        ActionState state = actionStates.get(stateKey);
+        LOG.debug("Lookup action state: key={}, found={}", stateKey, state != 
null);
+        return state;
+    }
+
+    private boolean checkDivergence(String key, long seqNum) {
+        return actionStates.keySet().stream()
+                        .filter(k -> k.startsWith(key + "_" + seqNum + "_"))
+                        .count()
+                > 1;
+    }
+
+    /**
+     * Removes cached state entries whose key starts with {@code keyPrefix} 
and whose parsed
+     * sequence number satisfies {@code seqNumFilter}.
+     */
+    private void removeStateEntries(String keyPrefix, LongPredicate 
seqNumFilter) {
+        actionStates
+                .entrySet()
+                .removeIf(
+                        entry -> {
+                            if (!entry.getKey().startsWith(keyPrefix)) {
+                                return false;
+                            }
+                            try {
+                                List<String> parts = 
ActionStateUtil.parseKey(entry.getKey());
+                                if (parts.size() >= 2) {
+                                    long stateSeqNum = 
Long.parseLong(parts.get(1));
+                                    return seqNumFilter.test(stateSeqNum);
+                                }
+                            } catch (Exception e) {
+                                LOG.warn("Failed to parse state key: {}", 
entry.getKey(), e);
+                            }
+                            return false;
+                        });
+    }
+
+    /**
+     * Rebuilds in-memory state by scanning the Fluss log table. If recovery 
markers are provided,
+     * computes the minimum offset per bucket across all markers and 
subscribes from those offsets.
+     * Otherwise, skips rebuild since there is no checkpointed position to 
recover from. Reads from
+     * the start offset up to the latest offset captured at rebuild start. For 
the same state key
+     * appearing multiple times in the log, the latest record wins 
(last-write-wins).
+     */
+    @Override
+    public void rebuildState(List<Object> recoveryMarkers) {
+        LOG.info(
+                "Rebuilding action state from Fluss log table with {} recovery 
markers",
+                recoveryMarkers.size());
+
+        if (recoveryMarkers.isEmpty()) {
+            LOG.info("No recovery markers, skipping state rebuild");
+            return;
+        }
+
+        actionStates.clear();
+
+        Map<Integer, Long> bucketStartOffsets = 
mergeRecoveryMarkerOffsets(recoveryMarkers);
+        if (bucketStartOffsets.isEmpty()) {
+            LOG.info("No valid bucket offsets in recovery markers, skipping 
state rebuild");
+            return;
+        }
+
+        Map<Integer, Long> bucketEndOffsets = getBucketEndOffsets();
+        Map<Integer, Long> bucketEarliestOffsets = getBucketEarliestOffsets();
+        LOG.debug(
+                "Rebuild window: startOffsets={}, earliestOffsets={}, 
endOffsets={}",
+                bucketStartOffsets,
+                bucketEarliestOffsets,
+                bucketEndOffsets);
+
+        if (!bucketEndOffsets.keySet().equals(bucketStartOffsets.keySet())) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Recovery marker bucket set %s does not match the 
current table "
+                                    + "bucket set %s. The table may have been 
recreated or the "
+                                    + "bucket count changed since the last 
checkpoint. "
+                                    + "A fresh start without recovery markers 
is required.",
+                            bucketStartOffsets.keySet(), 
bucketEndOffsets.keySet()));
+        }
+
+        try (LogScanner scanner = table.newScan().createLogScanner()) {
+            Map<Integer, Long> remainingBuckets =
+                    subscribeEffectiveOffsets(
+                            scanner, bucketStartOffsets, bucketEndOffsets, 
bucketEarliestOffsets);
+            LOG.debug("Subscribed buckets for rebuild: {}", remainingBuckets);
+
+            pollAndReplay(scanner, remainingBuckets);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to rebuild state from Fluss log 
table", e);
+        }
+
+        LOG.info("Completed rebuilding state, recovered {} states", 
actionStates.size());
+    }
+
+    /**
+     * Merges recovery markers into a per-bucket start offset map. For each 
bucket, the minimum
+     * offset across all markers is used to cover the widest recovery window.
+     */
+    private Map<Integer, Long> mergeRecoveryMarkerOffsets(List<Object> 
recoveryMarkers) {
+        Map<Integer, Long> bucketStartOffsets = new HashMap<>();
+        for (Object marker : recoveryMarkers) {
+            if (marker instanceof Map) {
+                @SuppressWarnings("unchecked")
+                Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+                for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+                    bucketStartOffsets.merge(entry.getKey(), entry.getValue(), 
Math::min);
+                }
+            } else if (marker != null) {
+                LOG.warn(
+                        "Ignoring unrecognized recovery marker type: {}",
+                        marker.getClass().getName());
+            }
+        }
+        return bucketStartOffsets;
+    }
+
+    /**
+     * Validates effective offsets for each bucket and subscribes the scanner. 
Buckets with no new
+     * data are skipped; buckets with data loss (retention cleaned the 
recovery window) cause an
+     * immediate failure.
+     *
+     * @return a map of bucket-id to end-offset for buckets that need to be 
scanned
+     */
+    private Map<Integer, Long> subscribeEffectiveOffsets(
+            LogScanner scanner,
+            Map<Integer, Long> bucketStartOffsets,
+            Map<Integer, Long> bucketEndOffsets,
+            Map<Integer, Long> bucketEarliestOffsets) {
+        Map<Integer, Long> remainingBuckets = new HashMap<>();
+        for (Map.Entry<Integer, Long> entry : bucketStartOffsets.entrySet()) {
+            int bucket = entry.getKey();
+            long startOffset = entry.getValue();
+
+            if (!bucketEndOffsets.containsKey(bucket)
+                    || !bucketEarliestOffsets.containsKey(bucket)) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Recovery marker references bucket %d which 
does not exist in "
+                                        + "the current table (available 
buckets: %s). The table "
+                                        + "may have been recreated or bucket 
count changed. "
+                                        + "A fresh start without recovery 
markers is required.",
+                                bucket, bucketEndOffsets.keySet()));
+            }
+
+            long endOffset = bucketEndOffsets.get(bucket);
+            long earliestOffset = bucketEarliestOffsets.get(bucket);
+
+            // No new data since checkpoint (includes empty buckets that never 
had writes:
+            // endOffset=0, startOffset=0)
+            if (endOffset == startOffset) {
+                LOG.info(
+                        "Skipping bucket {} for rebuild: no new data "
+                                + "(endOffset={} = startOffset={})",
+                        bucket,
+                        endOffset,
+                        startOffset);
+                continue;
+            }
+
+            // Data loss: retention cleaned the recovery window, or log was 
truncated/reset
+            if (earliestOffset > startOffset || endOffset < startOffset) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Data loss detected for bucket %d: required 
data is no longer "
+                                        + "available in the log 
(startOffset=%d, endOffset=%d, "
+                                        + "earliestOffset=%d). Increase log 
retention or reduce "
+                                        + "checkpoint interval.",
+                                bucket, startOffset, endOffset, 
earliestOffset));
+            }
+
+            scanner.subscribe(bucket, startOffset);
+            remainingBuckets.put(bucket, endOffset);
+        }
+        return remainingBuckets;
+    }
+
+    /**
+     * Polls the scanner and replays deserialized action states until all 
subscribed buckets have
+     * been fully consumed up to their end offsets.
+     */
+    private void pollAndReplay(LogScanner scanner, Map<Integer, Long> 
remainingBuckets) {
+        long deadline = System.currentTimeMillis() + 
REBUILD_TIMEOUT.toMillis();
+        while (!remainingBuckets.isEmpty()) {
+            if (System.currentTimeMillis() > deadline) {
+                throw new RuntimeException(
+                        String.format(
+                                "State rebuild timed out after %s. "
+                                        + "Remaining buckets not fully 
consumed: %s",
+                                REBUILD_TIMEOUT, remainingBuckets.keySet()));
+            }
+            ScanRecords records = scanner.poll(POLL_TIMEOUT);
+            for (TableBucket bucket : records.buckets()) {
+                Long endOffset = remainingBuckets.get(bucket.getBucket());
+                long lastSeenOffset = replayRecords(records.records(bucket), 
endOffset);
+                if (lastSeenOffset + 1 >= endOffset) {
+                    remainingBuckets.remove(bucket.getBucket());
+                    scanner.unsubscribe(bucket.getBucket());
+                }
+            }
+        }
+    }
+
+    /**
+     * Replays records from a single bucket, deserializing and applying each 
action state. Returns
+     * the highest log offset seen (including records past endOffset), used to 
detect bucket
+     * completion.
+     */
+    private long replayRecords(Iterable<ScanRecord> records, long endOffset) {
+        long lastSeenOffset = -1;
+        for (ScanRecord record : records) {
+            lastSeenOffset = record.logOffset();
+            if (record.logOffset() >= endOffset) {
+                break;
+            }
+            InternalRow row = record.getRow();
+            String stateKey = row.getString(COL_STATE_KEY).toString();
+            byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+            ActionState state = ActionStateSerde.deserialize(payload);
+            actionStates.put(stateKey, state);
+        }
+        return lastSeenOffset;
+    }
+
+    private Map<Integer, Long> getBucketEndOffsets() {
+        return getBucketOffsets(new OffsetSpec.LatestSpec());
+    }
+
+    private Map<Integer, Long> getBucketEarliestOffsets() {
+        return getBucketOffsets(new OffsetSpec.EarliestSpec());
+    }
+
+    private Map<Integer, Long> getBucketOffsets(OffsetSpec offsetSpec) {
+        int numBuckets = table.getTableInfo().getNumBuckets();
+        try (Admin admin = connection.getAdmin()) {
+            List<Integer> buckets = new ArrayList<>();
+            for (int b = 0; b < numBuckets; b++) {
+                buckets.add(b);
+            }
+            return admin.listOffsets(tablePath, buckets, 
offsetSpec).all().get();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to get offsets for Fluss table: 
" + tablePath, e);
+        }
+    }
+
+    /**
+     * Returns the end offsets of each bucket as a recovery marker. Similar to 
Kafka's
+     * implementation, this captures the current log position so that {@link 
#rebuildState} can
+     * resume from these offsets instead of scanning from the beginning.
+     */
+    @Override
+    public Object getRecoveryMarker() {
+        return getBucketEndOffsets();
+    }
+
+    /**
+     * Evicts pruned states from the in-memory cache. The Fluss log is 
append-only; physical cleanup
+     * relies on Fluss log retention configuration.
+     */
+    @Override
+    public void pruneState(Object key, long seqNum) {
+        LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key, 
seqNum);
+        removeStateEntries(key.toString() + "_", stateSeqNum -> stateSeqNum <= 
seqNum);
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            if (table != null) {
+                table.close();
+            }
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    private void maybeCreateDatabaseAndTable() {
+        try (Admin admin = connection.getAdmin()) {
+            if (!admin.databaseExists(databaseName).get()) {
+                admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, 
true).get();
+                LOG.info("Created Fluss database: {}", databaseName);
+            }
+
+            if (!admin.tableExists(tablePath).get()) {
+                // No primaryKey() call — this creates an append-only log 
table in Fluss.
+                Schema schema =
+                        Schema.newBuilder()
+                                .column(COL_NAME_STATE_KEY, DataTypes.STRING())
+                                .column(COL_NAME_STATE_PAYLOAD, 
DataTypes.BYTES())
+                                .column(COL_NAME_AGENT_KEY, DataTypes.STRING())
+                                .build();
+
+                int buckets = 
agentConfiguration.get(FLUSS_ACTION_STATE_TABLE_BUCKETS);
+                TableDescriptor descriptor =
+                        TableDescriptor.builder()
+                                .schema(schema)
+                                .distributedBy(buckets, COL_NAME_AGENT_KEY)
+                                .comment("Flink Agents action state log")
+                                .build();
+
+                admin.createTable(tablePath, descriptor, true).get();
+                LOG.info("Created Fluss log table: {}", tablePath);
+            } else {
+                LOG.info("Fluss table {} already exists", tablePath);
+            }
+        } catch (Exception e) {
+            LOG.error(
+                    "Failed to create or verify Fluss database/table: {}.{}",
+                    databaseName,
+                    tableName,
+                    e);
+            throw new RuntimeException("Failed to create or verify Fluss 
database/table", e);
+        }
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
index 648f146b..f09e5cd2 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
@@ -194,7 +194,9 @@ public class KafkaActionStateStore implements 
ActionStateStore {
     }
 
     private boolean checkDivergence(String key, long seqNum) {
-        return actionStates.keySet().stream().filter(k -> k.startsWith(key + 
"_" + seqNum)).count()
+        return actionStates.keySet().stream()
+                        .filter(k -> k.startsWith(key + "_" + seqNum + "_"))
+                        .count()
                 > 1;
     }
 
@@ -211,6 +213,7 @@ public class KafkaActionStateStore implements 
ActionStateStore {
             // Process recovery markers to get the smallest offsets for each 
partition
             for (Object marker : recoveryMarkers) {
                 if (marker instanceof Map) {
+                    @SuppressWarnings("unchecked")
                     Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
                     for (Map.Entry<Integer, Long> entry : 
markerMap.entrySet()) {
                         Long offset =
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index 16842d03..f8d1129a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.agents.plan.AgentConfiguration;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.actionstate.ActionState;
 import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
+import org.apache.flink.agents.runtime.actionstate.FlussActionStateStore;
 import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore;
 import org.apache.flink.agents.runtime.context.ActionStatePersister;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
@@ -48,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.ACTION_STATE_STORE_BACKEND;
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.FLUSS;
 import static 
org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.KAFKA;
 
 /**
@@ -113,10 +115,15 @@ class DurableExecutionManager implements 
ActionStatePersister, AutoCloseable {
      * @param config the agent configuration carrying the backend selection.
      */
     void maybeInitActionStateStore(AgentConfiguration config) {
-        if (actionStateStore == null
-                && 
KAFKA.getType().equalsIgnoreCase(config.get(ACTION_STATE_STORE_BACKEND))) {
-            LOG.info("Using Kafka as backend of action state store.");
-            actionStateStore = new KafkaActionStateStore(config);
+        if (actionStateStore == null) {
+            String backend = config.get(ACTION_STATE_STORE_BACKEND);
+            if (KAFKA.getType().equalsIgnoreCase(backend)) {
+                LOG.info("Using Kafka as backend of action state store.");
+                actionStateStore = new KafkaActionStateStore(config);
+            } else if (FLUSS.getType().equalsIgnoreCase(backend)) {
+                LOG.info("Using Fluss as backend of action state store.");
+                actionStateStore = new FlussActionStateStore(config);
+            }
         }
     }
 
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
index ca510332..e0c4de0b 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
@@ -53,16 +53,13 @@ public class ActionStateSerdeTest {
         originalState.addShortTermMemoryUpdate(shortTermMemoryUpdate);
         originalState.addEvent(outputEvent);
 
-        // Test Kafka seder/deserializer
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
         // Serialize
-        byte[] serialized = seder.serialize("test-topic", originalState);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
 
         // Deserialize
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
         assertNotNull(deserializedState);
 
         // Verify taskEvent
@@ -102,10 +99,8 @@ public class ActionStateSerdeTest {
         originalState.addSensoryMemoryUpdate(memoryUpdate);
 
         // Test serialization/deserialization
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
-        byte[] serialized = seder.serialize("test-topic", originalState);
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
 
         // Verify taskEvent is null
         assertNull(deserializedState.getTaskEvent());
@@ -128,10 +123,8 @@ public class ActionStateSerdeTest {
         ActionState originalState = new ActionState(inputEvent);
 
         // Test serialization/deserialization
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
-        byte[] serialized = seder.serialize("test-topic", originalState);
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
 
         // Verify complex attributes are preserved
         InputEvent deserializedInputEvent = (InputEvent) 
deserializedState.getTaskEvent();
@@ -157,10 +150,8 @@ public class ActionStateSerdeTest {
         originalState.addCallResult(result2);
 
         // Test serialization/deserialization
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
-        byte[] serialized = seder.serialize("test-topic", originalState);
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
 
         // Verify call results
         assertEquals(2, deserializedState.getCallResultCount());
@@ -186,10 +177,8 @@ public class ActionStateSerdeTest {
         ActionState originalState = new ActionState(inputEvent);
         originalState.addCallResult(CallResult.pending("module.func", 
"digest"));
 
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
-        byte[] serialized = seder.serialize("test-topic", originalState);
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
 
         assertEquals(1, deserializedState.getCallResultCount());
         CallResult result = deserializedState.getCallResult(0);
@@ -215,10 +204,8 @@ public class ActionStateSerdeTest {
                         inputEvent, sensoryUpdates, shortTermUpdates, 
outputEvents, null, true);
 
         // Test serialization/deserialization
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
-        byte[] serialized = seder.serialize("test-topic", originalState);
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
 
         // Verify completed flag
         assertTrue(deserializedState.isCompleted());
@@ -242,10 +229,8 @@ public class ActionStateSerdeTest {
                 new ActionState(inputEvent, null, null, null, callResults, 
false);
 
         // Test serialization/deserialization
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
-        byte[] serialized = seder.serialize("test-topic", originalState);
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
 
         // Verify state
         assertFalse(deserializedState.isCompleted());
@@ -261,10 +246,8 @@ public class ActionStateSerdeTest {
         ActionState originalState = new ActionState(inputEvent);
         originalState.addCallResult(new CallResult("func", "digest", null, 
null));
 
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
-        byte[] serialized = seder.serialize("test-topic", originalState);
-        ActionState deserializedState = seder.deserialize("test-topic", 
serialized);
+        byte[] serialized = ActionStateSerde.serialize(originalState);
+        ActionState deserializedState = 
ActionStateSerde.deserialize(serialized);
 
         assertEquals(1, deserializedState.getCallResultCount());
         CallResult result = deserializedState.getCallResult(0);
@@ -310,9 +293,8 @@ public class ActionStateSerdeTest {
                         + "\"completed\":false"
                         + "}";
 
-        ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
         ActionState deserializedState =
-                seder.deserialize("test-topic", 
json.getBytes(StandardCharsets.UTF_8));
+                
ActionStateSerde.deserialize(json.getBytes(StandardCharsets.UTF_8));
 
         assertEquals(2, deserializedState.getCallResultCount());
 
@@ -326,4 +308,29 @@ public class ActionStateSerdeTest {
         assertArrayEquals(
                 "exception".getBytes(StandardCharsets.UTF_8), 
legacyFailure.getExceptionPayload());
     }
+
+    @Test
+    public void testKafkaSederDelegatesToActionStateSerde() throws Exception {
+        InputEvent inputEvent = new InputEvent("test delegation");
+        ActionState originalState = new ActionState(inputEvent);
+
+        // Serialize via ActionStateSerde, deserialize via Kafka seder (and 
vice versa)
+        ActionStateKafkaSeder kafkaSeder = new ActionStateKafkaSeder();
+
+        byte[] serializedBySerde = ActionStateSerde.serialize(originalState);
+        byte[] serializedByKafka = kafkaSeder.serialize("test-topic", 
originalState);
+
+        ActionState fromSerde = 
ActionStateSerde.deserialize(serializedByKafka);
+        ActionState fromKafka = kafkaSeder.deserialize("test-topic", 
serializedBySerde);
+
+        // Both should produce identical results
+        assertArrayEquals(serializedBySerde, serializedByKafka);
+        assertEquals(
+                ((InputEvent) fromSerde.getTaskEvent()).getInput(),
+                ((InputEvent) fromKafka.getTaskEvent()).getInput());
+
+        // Kafka seder should handle nulls
+        assertNull(kafkaSeder.serialize("test-topic", null));
+        assertNull(kafkaSeder.deserialize("test-topic", null));
+    }
 }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
index a04be38a..2a90c1f1 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
@@ -17,10 +17,7 @@
  */
 package org.apache.flink.agents.runtime.actionstate;
 
-import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.context.RunnerContext;
-import org.apache.flink.agents.plan.JavaFunction;
 import org.apache.flink.agents.plan.actions.Action;
 import org.junit.jupiter.api.Test;
 
@@ -38,7 +35,7 @@ public class ActionStateUtilTest {
     public void testGenerateKeyConsistency() throws Exception {
         // Create test data
         Object key = "consistency-test";
-        Action action = new TestAction("consistency-action");
+        Action action = new NoOpAction("consistency-action");
         InputEvent inputEvent = new InputEvent("same-input");
         InputEvent inputEvent2 = new InputEvent("same-input");
 
@@ -54,7 +51,7 @@ public class ActionStateUtilTest {
     public void testGenerateKeyDifferentInputs() throws Exception {
         // Create test data
         Object key = "diff-test";
-        Action action = new TestAction("diff-action");
+        Action action = new NoOpAction("diff-action");
         InputEvent inputEvent1 = new InputEvent("input1");
         InputEvent inputEvent2 = new InputEvent("input2");
 
@@ -68,7 +65,7 @@ public class ActionStateUtilTest {
 
     @Test
     public void testGenerateKeyWithNullKey() throws Exception {
-        Action action = new TestAction("test-action");
+        Action action = new NoOpAction("test-action");
         InputEvent inputEvent = new InputEvent("test-input");
 
         assertThrows(
@@ -93,7 +90,7 @@ public class ActionStateUtilTest {
     @Test
     public void testGenerateKeyWithNullEvent() throws Exception {
         Object key = "test-key";
-        Action action = new TestAction("test-action");
+        Action action = new NoOpAction("test-action");
 
         assertThrows(
                 NullPointerException.class,
@@ -106,7 +103,7 @@ public class ActionStateUtilTest {
     public void testParseKeyValidKey() throws Exception {
         // Create test data and generate a key
         Object key = "test-key";
-        Action action = new TestAction("test-action");
+        Action action = new NoOpAction("test-action");
         InputEvent inputEvent = new InputEvent("test-input");
         long seqNum = 123;
 
@@ -128,7 +125,7 @@ public class ActionStateUtilTest {
     public void testParseKeyRoundTrip() throws Exception {
         // Test that generate -> parse -> values match original inputs
         Object originalKey = "round-trip-test";
-        Action action = new TestAction("round-trip-action");
+        Action action = new NoOpAction("round-trip-action");
         InputEvent inputEvent = new InputEvent("round-trip-input");
         long seqNum = 456;
 
@@ -176,7 +173,7 @@ public class ActionStateUtilTest {
     public void testParseKeyWithSpecialCharacters() throws Exception {
         // Test with keys containing special characters (but not the separator)
         Object key = "key-with-special@chars#123";
-        Action action = new TestAction("action-with-special@chars");
+        Action action = new NoOpAction("action-with-special@chars");
         InputEvent inputEvent = new InputEvent("input-with-special@chars");
         long seqNum = 789;
 
@@ -190,7 +187,7 @@ public class ActionStateUtilTest {
     @Test
     public void testParseKeyConsistencyWithDifferentKeys() throws Exception {
         // Generate keys with different inputs and verify parsing consistency
-        Action action = new TestAction("consistency-action");
+        Action action = new NoOpAction("consistency-action");
         InputEvent inputEvent = new InputEvent("consistency-input");
 
         String key1 = ActionStateUtil.generateKey("key1", 100, action, 
inputEvent);
@@ -207,21 +204,4 @@ public class ActionStateUtilTest {
         assertEquals(parsed1.get(2), parsed2.get(2)); // Event UUID
         assertEquals(parsed1.get(3), parsed2.get(3)); // Action UUID
     }
-
-    private static class TestAction extends Action {
-
-        public static void doNothing(Event event, RunnerContext context) {
-            // No operation
-        }
-
-        public TestAction(String name) throws Exception {
-            super(
-                    name,
-                    new JavaFunction(
-                            TestAction.class.getName(),
-                            "doNothing",
-                            new Class[] {Event.class, RunnerContext.class}),
-                    List.of(InputEvent.EVENT_TYPE));
-        }
-    }
 }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java
new file mode 100644
index 00000000..0d4ddd06
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for {@link FlussActionStateStore} against an embedded 
Fluss cluster. */
+public class FlussActionStateStoreIT {
+
+    private static final String TEST_DATABASE = "test_flink_agents";
+    private static final String TEST_TABLE = "action_state_it";
+    private static final String TEST_KEY = "test-key";
+
+    @RegisterExtension
+    static final FlussClusterExtension FLUSS_CLUSTER =
+            FlussClusterExtension.builder().setNumOfTabletServers(1).build();
+
+    private FlussActionStateStore store;
+    private Action testAction;
+    private Event testEvent;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        AgentConfiguration config = createAgentConfiguration();
+        store = new FlussActionStateStore(config);
+
+        // Wait for table to be ready in the cluster
+        waitForTableReady();
+
+        testAction = new NoOpAction("test-action");
+        testEvent = new InputEvent("test-data");
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    // ==================== Basic CRUD ====================
+
+    @Test
+    void testPutAndGet() throws Exception {
+        ActionState state = new ActionState(testEvent);
+
+        store.put(TEST_KEY, 1L, testAction, testEvent, state);
+        ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent);
+
+        assertThat(retrieved).isNotNull();
+        assertThat(retrieved.getTaskEvent()).isEqualTo(testEvent);
+        assertThat(retrieved.isCompleted()).isFalse();
+    }
+
+    @Test
+    void testGetNonExistent() throws Exception {
+        ActionState result = store.get(TEST_KEY, 999L, testAction, testEvent);
+
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void testMultipleSeqNums() throws Exception {
+        InputEvent event1 = new InputEvent("data-1");
+        InputEvent event2 = new InputEvent("data-2");
+        InputEvent event3 = new InputEvent("data-3");
+        ActionState state1 = new ActionState(event1);
+        ActionState state2 = new ActionState(event2);
+        ActionState state3 = new ActionState(event3);
+
+        store.put(TEST_KEY, 1L, testAction, testEvent, state1);
+        store.put(TEST_KEY, 2L, testAction, testEvent, state2);
+        store.put(TEST_KEY, 3L, testAction, testEvent, state3);
+
+        assertThat(store.get(TEST_KEY, 1L, testAction, 
testEvent).getTaskEvent()).isEqualTo(event1);
+        assertThat(store.get(TEST_KEY, 2L, testAction, 
testEvent).getTaskEvent()).isEqualTo(event2);
+        assertThat(store.get(TEST_KEY, 3L, testAction, 
testEvent).getTaskEvent()).isEqualTo(event3);
+    }
+
+    @Test
+    void testUpsertOverwrite() throws Exception {
+        ActionState original = new ActionState(new InputEvent("original"));
+        store.put(TEST_KEY, 1L, testAction, testEvent, original);
+
+        InputEvent updatedEvent = new InputEvent("updated");
+        ActionState updated = new ActionState(updatedEvent);
+        store.put(TEST_KEY, 1L, testAction, testEvent, updated);
+
+        ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent);
+        assertThat(retrieved).isNotNull();
+        assertThat(retrieved.getTaskEvent()).isEqualTo(updatedEvent);
+    }
+
+    // ==================== Pruning ====================
+
+    @Test
+    void testPruneSingleKey() throws Exception {
+        store.put(TEST_KEY, 1L, testAction, testEvent, new 
ActionState(testEvent));
+        store.put(TEST_KEY, 2L, testAction, testEvent, new 
ActionState(testEvent));
+        store.put(TEST_KEY, 3L, testAction, testEvent, new 
ActionState(testEvent));
+
+        store.pruneState(TEST_KEY, 2L);
+
+        // pruneState is synchronous (in-memory eviction)
+        // Check surviving entry first: get() with a missing key triggers 
divergence cleanup
+        // that removes entries with higher seqNums (same as Kafka backend 
behavior).
+        assertThat(store.get(TEST_KEY, 3L, testAction, testEvent)).isNotNull();
+        assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNull();
+        assertThat(store.get(TEST_KEY, 2L, testAction, testEvent)).isNull();
+    }
+
+    // ==================== Recovery ====================
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testRecoveryMarkerReturnsBucketOffsets() {
+        Object marker = store.getRecoveryMarker();
+        assertThat(marker).isNotNull();
+        assertThat(marker).isInstanceOf(Map.class);
+        Map<Integer, Long> bucketOffsets = (Map<Integer, Long>) marker;
+        // With 1 bucket configured, we should have exactly 1 entry
+        assertThat(bucketOffsets).hasSize(1);
+        assertThat(bucketOffsets).containsKey(0);
+        assertThat(bucketOffsets.get(0)).isGreaterThanOrEqualTo(0L);
+    }
+
+    @Test
+    void testRebuildStateWithEmptyMarkersSkipsRebuild() throws Exception {
+        store.put(TEST_KEY, 1L, testAction, testEvent, new 
ActionState(testEvent));
+
+        // rebuildState with empty markers should skip rebuild (aligned with 
Kafka backend)
+        store.rebuildState(Collections.emptyList());
+
+        // The in-memory cache is not cleared when rebuild is skipped,
+        // so the state should still be accessible
+        assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNotNull();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testRebuildStateWithRecoveryMarkers() throws Exception {
+        store.put(TEST_KEY, 1L, testAction, testEvent, new 
ActionState(testEvent));
+
+        // Capture recovery marker after writing data (simulates checkpoint 
boundary)
+        Object marker = store.getRecoveryMarker();
+
+        // Write more data after the marker (simulates writes between 
checkpoint and crash)
+        store.put(TEST_KEY, 2L, testAction, testEvent, new 
ActionState(testEvent));
+
+        // Close to ensure all writes are fully committed before recovery
+        store.close();
+
+        // Simulate recovery: new store instance
+        FlussActionStateStore recoveredStore =
+                new FlussActionStateStore(createAgentConfiguration());
+        try {
+            // Rebuild using the marker; should replay from marker offset to 
current end
+            recoveredStore.rebuildState(List.of(marker));
+
+            // Check surviving entry first: get() with a missing key triggers 
divergence cleanup
+            // that removes entries with higher seqNums (same as Kafka backend 
behavior).
+            // Data written after the marker should be recovered
+            assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, 
testEvent)).isNotNull();
+            // Data written before the marker should NOT be in the rebuilt 
cache
+            assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, 
testEvent)).isNull();
+        } finally {
+            recoveredStore.close();
+            // Prevent double-close in tearDown
+            store = null;
+        }
+    }
+
+    @Test
+    void testPruneWorksAfterRecovery() throws Exception {
+        // Capture recovery marker BEFORE writing data.
+        Object marker = store.getRecoveryMarker();
+
+        store.put(TEST_KEY, 1L, testAction, testEvent, new 
ActionState(testEvent));
+        store.put(TEST_KEY, 2L, testAction, testEvent, new 
ActionState(testEvent));
+        store.put(TEST_KEY, 3L, testAction, testEvent, new 
ActionState(testEvent));
+        store.close();
+
+        // Simulate recovery: new store instance
+        FlussActionStateStore recoveredStore =
+                new FlussActionStateStore(createAgentConfiguration());
+        try {
+            // Rebuild state from the log using recovery markers
+            recoveredStore.rebuildState(List.of(marker));
+
+            assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, 
testEvent)).isNotNull();
+            assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, 
testEvent)).isNotNull();
+            assertThat(recoveredStore.get(TEST_KEY, 3L, testAction, 
testEvent)).isNotNull();
+
+            recoveredStore.pruneState(TEST_KEY, 2L);
+
+            // Check surviving entry first (get() divergence cleanup 
side-effect)
+            assertThat(recoveredStore.get(TEST_KEY, 3L, testAction, 
testEvent)).isNotNull();
+            assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, 
testEvent)).isNull();
+            assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, 
testEvent)).isNull();
+        } finally {
+            recoveredStore.close();
+        }
+    }
+
+    // ==================== Multi-bucket ====================
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testMultiBucketRecovery() throws Exception {
+        // Use a separate database/table with 4 buckets to test multi-bucket 
scenario
+        String multiDb = "test_flink_agents_multi";
+        String multiTable = "action_state_multi";
+        AgentConfiguration multiConfig = createAgentConfiguration(multiDb, 
multiTable, 4);
+        FlussActionStateStore multiStore = new 
FlussActionStateStore(multiConfig);
+        try {
+            waitForTableReady(multiDb, multiTable);
+
+            // Write states with different keys (likely distributed across 
buckets)
+            Action action1 = new NoOpAction("multi-action");
+            for (int i = 0; i < 10; i++) {
+                String key = "multi-key-" + i;
+                multiStore.put(key, 1L, action1, testEvent, new 
ActionState(testEvent));
+            }
+
+            // Verify all states are retrievable
+            for (int i = 0; i < 10; i++) {
+                String key = "multi-key-" + i;
+                assertThat(multiStore.get(key, 1L, action1, 
testEvent)).isNotNull();
+            }
+
+            // Recovery marker should contain all 4 buckets
+            Object marker = multiStore.getRecoveryMarker();
+            assertThat(marker).isInstanceOf(Map.class);
+            Map<Integer, Long> bucketOffsets = (Map<Integer, Long>) marker;
+            assertThat(bucketOffsets).hasSize(4);
+
+            // Write more data after marker
+            for (int i = 10; i < 15; i++) {
+                String key = "multi-key-" + i;
+                multiStore.put(key, 1L, action1, testEvent, new 
ActionState(testEvent));
+            }
+            multiStore.close();
+
+            // Recover into a new store instance
+            FlussActionStateStore recoveredStore = new 
FlussActionStateStore(multiConfig);
+            try {
+                recoveredStore.rebuildState(List.of(marker));
+
+                // Data written after marker should be recovered
+                for (int i = 10; i < 15; i++) {
+                    String key = "multi-key-" + i;
+                    assertThat(recoveredStore.get(key, 1L, action1, 
testEvent)).isNotNull();
+                }
+            } finally {
+                recoveredStore.close();
+            }
+        } finally {
+            multiStore.close();
+            // Prevent double-close in tearDown
+            store = null;
+        }
+    }
+
+    // ==================== Helpers ====================
+
+    private AgentConfiguration createAgentConfiguration() {
+        return createAgentConfiguration(TEST_DATABASE, TEST_TABLE, 1);
+    }
+
+    private AgentConfiguration createAgentConfiguration(
+            String database, String table, int buckets) {
+        AgentConfiguration config = new AgentConfiguration();
+        config.set(FLUSS_BOOTSTRAP_SERVERS, 
FLUSS_CLUSTER.getBootstrapServers());
+        config.set(FLUSS_ACTION_STATE_DATABASE, database);
+        config.set(FLUSS_ACTION_STATE_TABLE, table);
+        config.set(FLUSS_ACTION_STATE_TABLE_BUCKETS, buckets);
+        return config;
+    }
+
+    private void waitForTableReady() throws Exception {
+        waitForTableReady(TEST_DATABASE, TEST_TABLE);
+    }
+
+    private void waitForTableReady(String database, String table) throws 
Exception {
+        TablePath tablePath = TablePath.of(database, table);
+        try (org.apache.fluss.client.Connection conn =
+                        
org.apache.fluss.client.ConnectionFactory.createConnection(
+                                FLUSS_CLUSTER.getClientConfig());
+                Admin admin = conn.getAdmin()) {
+            TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+            FLUSS_CLUSTER.waitUntilTableReady(tableInfo.getTableId());
+        }
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java
new file mode 100644
index 00000000..547a83fb
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link FlussActionStateStore} with SASL/PLAIN 
authentication against an
+ * embedded Fluss cluster.
+ */
+public class FlussActionStateStoreSaslIT {
+
+    private static final String TEST_DATABASE = "test_flink_agents_sasl";
+    private static final String TEST_TABLE = "action_state_sasl_it";
+    private static final String TEST_KEY = "test-key";
+    private static final String SASL_USERNAME = "testuser";
+    private static final String SASL_PASSWORD = "testpass";
+
+    @RegisterExtension
+    static final FlussClusterExtension FLUSS_CLUSTER =
+            FlussClusterExtension.builder()
+                    .setNumOfTabletServers(1)
+                    .setCoordinatorServerListeners("FLUSS://localhost:0, 
CLIENT://localhost:0")
+                    .setTabletServerListeners("FLUSS://localhost:0, 
CLIENT://localhost:0")
+                    .setClusterConf(createSaslClusterConfig())
+                    .build();
+
+    private FlussActionStateStore store;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        AgentConfiguration config = createSaslAgentConfiguration();
+        store = new FlussActionStateStore(config);
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    @Test
+    void testPutAndGetWithSaslAuth() throws Exception {
+        Action testAction = new NoOpAction("sasl-action");
+        InputEvent testEvent = new InputEvent("sasl-data");
+        ActionState state = new ActionState(testEvent);
+
+        store.put(TEST_KEY, 1L, testAction, testEvent, state);
+        ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent);
+
+        assertThat(retrieved).isNotNull();
+        assertThat(retrieved.getTaskEvent()).isEqualTo(testEvent);
+    }
+
+    @Test
+    void testRecoveryWithSaslAuth() throws Exception {
+        Action testAction = new NoOpAction("sasl-recovery-action");
+        InputEvent testEvent = new InputEvent("sasl-recovery-data");
+
+        // Write data and capture recovery marker
+        store.put(TEST_KEY, 1L, testAction, testEvent, new 
ActionState(testEvent));
+        Object marker = store.getRecoveryMarker();
+
+        // Write more data after marker
+        store.put(TEST_KEY, 2L, testAction, testEvent, new 
ActionState(testEvent));
+        store.close();
+
+        // Recover into a new store instance with SASL
+        FlussActionStateStore recoveredStore =
+                new FlussActionStateStore(createSaslAgentConfiguration());
+        try {
+            recoveredStore.rebuildState(java.util.List.of(marker));
+
+            // Data written after marker should be recovered
+            assertThat(recoveredStore.get(TEST_KEY, 2L, testAction, 
testEvent)).isNotNull();
+            // Data written before marker should NOT be in the rebuilt cache
+            assertThat(recoveredStore.get(TEST_KEY, 1L, testAction, 
testEvent)).isNull();
+        } finally {
+            recoveredStore.close();
+            store = null;
+        }
+    }
+
+    @Test
+    void testMultipleWritesWithSaslAuth() throws Exception {
+        Action testAction = new NoOpAction("sasl-multi-action");
+        InputEvent testEvent = new InputEvent("sasl-multi-data");
+
+        for (int i = 0; i < 5; i++) {
+            store.put("key-" + i, (long) i, testAction, testEvent, new 
ActionState(testEvent));
+        }
+
+        for (int i = 0; i < 5; i++) {
+            assertThat(store.get("key-" + i, (long) i, testAction, 
testEvent)).isNotNull();
+        }
+    }
+
+    // ==================== Helpers ====================
+
+    private static Configuration createSaslClusterConfig() {
+        Configuration conf = new Configuration();
+        conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(), 
"CLIENT:sasl");
+        conf.setString("security.sasl.enabled.mechanisms", "plain");
+        conf.setString(
+                "security.sasl.plain.jaas.config",
+                "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule 
required "
+                        + "    user_"
+                        + SASL_USERNAME
+                        + "=\""
+                        + SASL_PASSWORD
+                        + "\";");
+        return conf;
+    }
+
+    private AgentConfiguration createSaslAgentConfiguration() {
+        AgentConfiguration config = new AgentConfiguration();
+        String bootstrapServers =
+                String.join(
+                        ",",
+                        FLUSS_CLUSTER
+                                .getClientConfig("CLIENT")
+                                .get(ConfigOptions.BOOTSTRAP_SERVERS));
+        config.set(FLUSS_BOOTSTRAP_SERVERS, bootstrapServers);
+        config.set(FLUSS_ACTION_STATE_DATABASE, TEST_DATABASE);
+        config.set(FLUSS_ACTION_STATE_TABLE, TEST_TABLE);
+        config.set(FLUSS_ACTION_STATE_TABLE_BUCKETS, 1);
+        config.set(FLUSS_SECURITY_PROTOCOL, "SASL");
+        config.set(FLUSS_SASL_MECHANISM, "PLAIN");
+        config.set(FLUSS_SASL_USERNAME, SASL_USERNAME);
+        config.set(FLUSS_SASL_PASSWORD, SASL_PASSWORD);
+        return config;
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java
new file mode 100644
index 00000000..308ad3d4
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.row.InternalRow;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit tests for {@link FlussActionStateStore} store-level behavior. */
+public class FlussActionStateStoreTest {
+
+    private static final String TEST_KEY = "test-key";
+
+    private AppendWriter mockWriter;
+    private FlussActionStateStore store;
+    private Action testAction;
+    private Event testEvent;
+    private ActionState testActionState;
+    private Map<String, ActionState> actionStates;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mockWriter = mock(AppendWriter.class);
+        when(mockWriter.append(any(InternalRow.class)))
+                .thenReturn(CompletableFuture.completedFuture(null));
+
+        actionStates = new HashMap<>();
+        store =
+                new FlussActionStateStore(
+                        actionStates, mock(Connection.class), 
mock(Table.class), mockWriter);
+
+        testAction = new NoOpAction("test-action");
+        testEvent = new InputEvent("test data");
+        testActionState = new ActionState(testEvent);
+    }
+
+    @Test
+    void testPutActionState() throws Exception {
+        store.put(TEST_KEY, 1L, testAction, testEvent, testActionState);
+
+        verify(mockWriter).append(any(InternalRow.class));
+
+        String stateKey = ActionStateUtil.generateKey(TEST_KEY, 1L, 
testAction, testEvent);
+        assertThat(actionStates).containsKey(stateKey);
+        assertThat(actionStates.get(stateKey)).isEqualTo(testActionState);
+    }
+
+    @Test
+    void testPutActionStateWriterFailure() throws Exception {
+        when(mockWriter.append(any(InternalRow.class)))
+                .thenReturn(CompletableFuture.failedFuture(new 
IOException("connection lost")));
+
+        FlussActionStateStore failStore =
+                new FlussActionStateStore(
+                        actionStates, mock(Connection.class), 
mock(Table.class), mockWriter);
+
+        assertThatThrownBy(
+                        () -> failStore.put(TEST_KEY, 1L, testAction, 
testEvent, testActionState))
+                .isInstanceOf(Exception.class);
+
+        // Cache should NOT be updated on write failure
+        String stateKey = ActionStateUtil.generateKey(TEST_KEY, 1L, 
testAction, testEvent);
+        assertThat(actionStates).doesNotContainKey(stateKey);
+    }
+
+    @Test
+    void testGetTriggersDivergenceCleanup() throws Exception {
+        actionStates.put(
+                ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, 
testEvent), testActionState);
+        actionStates.put(
+                ActionStateUtil.generateKey(TEST_KEY, 2L, testAction, 
testEvent), testActionState);
+        // diverge: same key+seqNum, different action
+        actionStates.put(
+                ActionStateUtil.generateKey(TEST_KEY, 2L, new 
NoOpAction("test-2"), testEvent),
+                testActionState);
+        actionStates.put(
+                ActionStateUtil.generateKey(TEST_KEY, 3L, testAction, 
testEvent), testActionState);
+
+        store.get(TEST_KEY, 2L, new NoOpAction("test-1"), testEvent);
+
+        // Divergence detected at seqNum 2 → removeIf clears seqNum > 2
+        assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNotNull();
+        assertThat(store.get(TEST_KEY, 2L, testAction, testEvent)).isNotNull();
+        assertThat(store.get(TEST_KEY, 3L, testAction, testEvent)).isNull();
+    }
+
+    // ==================== rebuildState tests ====================
+
+    @Test
+    void testRebuildStateSkipsOnEmptyMarkers() throws Exception {
+        actionStates.put(
+                ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, 
testEvent), testActionState);
+
+        store.rebuildState(Collections.emptyList());
+
+        // Empty markers → rebuild is skipped, cache is NOT cleared
+        assertThat(actionStates).isNotEmpty();
+    }
+
+    @Test
+    void testRebuildStateSkipsOnNonMapMarker() throws Exception {
+        actionStates.put(
+                ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, 
testEvent), testActionState);
+
+        // A non-Map marker is ignored, resulting in empty bucketStartOffsets.
+        // Note: rebuildState clears the cache before checking offsets,
+        // so the cache will be empty even though no actual rebuild occurs.
+        store.rebuildState(List.of("invalid-marker"));
+
+        assertThat(actionStates).isEmpty();
+    }
+
+    @Test
+    void testRebuildStateSkipsOnEmptyBucketOffsets() throws Exception {
+        actionStates.put(
+                ActionStateUtil.generateKey(TEST_KEY, 1L, testAction, 
testEvent), testActionState);
+
+        // Empty map marker → no valid bucket offsets.
+        // Same as above: cache is cleared before the early-return check.
+        store.rebuildState(List.of(Map.of()));
+
+        assertThat(actionStates).isEmpty();
+    }
+
+    @Test
+    void testCloseClosesResources() throws Exception {
+        Table mockTable = mock(Table.class);
+        Connection mockConnection = mock(Connection.class);
+
+        FlussActionStateStore closeableStore =
+                new FlussActionStateStore(actionStates, mockConnection, 
mockTable, mockWriter);
+
+        closeableStore.close();
+
+        verify(mockTable).close();
+        verify(mockConnection).close();
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
index 32d62f77..7cf829c0 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
@@ -19,9 +19,7 @@ package org.apache.flink.agents.runtime.actionstate;
 
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.plan.AgentConfiguration;
-import org.apache.flink.agents.plan.JavaFunction;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -76,7 +74,7 @@ public class KafkaActionStateStoreTest {
                         TEST_TOPIC);
 
         // Create test objects
-        testAction = new TestAction("test-action");
+        testAction = new NoOpAction("test-action");
         testEvent = new InputEvent("test data");
         testActionState = new ActionState(testEvent);
     }
@@ -107,7 +105,7 @@ public class KafkaActionStateStoreTest {
         actionStates.put(
                 ActionStateUtil.generateKey(TEST_KEY, 4L, testAction, 
testEvent), testActionState);
 
-        actionStateStore.get(TEST_KEY, 2L, new TestAction("test-1"), 
testEvent);
+        actionStateStore.get(TEST_KEY, 2L, new NoOpAction("test-1"), 
testEvent);
 
         assertNotNull(actionStateStore.get(TEST_KEY, 1L, testAction, 
testEvent));
         assertNotNull(actionStateStore.get(TEST_KEY, 2L, testAction, 
testEvent));
@@ -123,7 +121,7 @@ public class KafkaActionStateStoreTest {
                 ActionStateUtil.generateKey(TEST_KEY, 2L, testAction, 
testEvent), testActionState);
         // diverge here
         actionStates.put(
-                ActionStateUtil.generateKey(TEST_KEY, 2L, new 
TestAction("test-2"), testEvent),
+                ActionStateUtil.generateKey(TEST_KEY, 2L, new 
NoOpAction("test-2"), testEvent),
                 testActionState);
         actionStates.put(
                 ActionStateUtil.generateKey(TEST_KEY, 3L, testAction, 
testEvent), testActionState);
@@ -256,21 +254,4 @@ public class KafkaActionStateStoreTest {
                                 ActionStateUtil.generateKey(TEST_KEY, 3L, 
testAction, testEvent)))
                 .isEqualTo(thirdState);
     }
-
-    private static class TestAction extends Action {
-
-        public static void doNothing(Event event, RunnerContext context) {
-            // No operation
-        }
-
-        public TestAction(String name) throws Exception {
-            super(
-                    name,
-                    new JavaFunction(
-                            TestAction.class.getName(),
-                            "doNothing",
-                            new Class[] {Event.class, RunnerContext.class}),
-                    List.of(InputEvent.EVENT_TYPE));
-        }
-    }
 }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java
new file mode 100644
index 00000000..cfc1f5d2
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.plan.JavaFunction;
+import org.apache.flink.agents.plan.actions.Action;
+
+import java.util.List;
+
+/** A no-op {@link Action} for use in action state store tests. */
+public class NoOpAction extends Action {
+
+    public static void doNothing(Event event, RunnerContext context) {
+        // No operation
+    }
+
+    public NoOpAction(String name) throws Exception {
+        super(
+                name,
+                new JavaFunction(
+                        NoOpAction.class.getName(),
+                        "doNothing",
+                        new Class[] {Event.class, RunnerContext.class}),
+                List.of(InputEvent.class.getName()));
+    }
+}

Reply via email to