This is an automated email from the ASF dual-hosted git repository.
Sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 171465f7 [feature][runtime] Support Fluss as ActionStateStore backend
(#628)
171465f7 is described below
commit 171465f7f1b5f7841970fb9b657b84ceafead88e
Author: Junbo Wang <[email protected]>
AuthorDate: Thu May 14 09:26:08 2026 +0800
[feature][runtime] Support Fluss as ActionStateStore backend (#628)
---
.../api/configuration/AgentConfigOptions.java | 42 ++
docs/content/docs/operations/configuration.md | 23 +-
docs/content/docs/operations/deployment.md | 2 +-
docs/themes/book | 2 +-
pom.xml | 2 +
runtime/pom.xml | 38 ++
.../actionstate/ActionStateKafkaDeserializer.java | 110 -----
.../runtime/actionstate/ActionStateKafkaSeder.java | 85 +---
...eKafkaSerializer.java => ActionStateSerde.java} | 51 +-
.../runtime/actionstate/ActionStateStore.java | 3 +-
.../runtime/actionstate/FlussActionStateStore.java | 542 +++++++++++++++++++++
.../runtime/actionstate/KafkaActionStateStore.java | 5 +-
.../runtime/operator/DurableExecutionManager.java | 15 +-
.../runtime/actionstate/ActionStateSerdeTest.java | 77 +--
.../runtime/actionstate/ActionStateUtilTest.java | 36 +-
.../actionstate/FlussActionStateStoreIT.java | 330 +++++++++++++
.../actionstate/FlussActionStateStoreSaslIT.java | 168 +++++++
.../actionstate/FlussActionStateStoreTest.java | 173 +++++++
.../actionstate/KafkaActionStateStoreTest.java | 25 +-
.../agents/runtime/actionstate/NoOpAction.java | 44 ++
20 files changed, 1458 insertions(+), 315 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 100894f7..608b7f78 100644
---
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -62,6 +62,48 @@ public class AgentConfigOptions {
public static final ConfigOption<Integer>
KAFKA_ACTION_STATE_TOPIC_REPLICATION_FACTOR =
new ConfigOption<>("kafkaActionStateTopicReplicationFactor",
Integer.class, 1);
+ /** The config parameter specifies the Fluss bootstrap servers. */
+ public static final ConfigOption<String> FLUSS_BOOTSTRAP_SERVERS =
+ new ConfigOption<>("flussBootstrapServers", String.class,
"localhost:9123");
+
+ /** The config parameter specifies the Fluss database for action state. */
+ public static final ConfigOption<String> FLUSS_ACTION_STATE_DATABASE =
+ new ConfigOption<>("flussActionStateDatabase", String.class,
"flink_agents");
+
+ /** The config parameter specifies the Fluss table name for action state.
*/
+ public static final ConfigOption<String> FLUSS_ACTION_STATE_TABLE =
+ new ConfigOption<>("flussActionStateTable", String.class, null);
+
+ /** The config parameter specifies the number of buckets for the Fluss
action state table. */
+ public static final ConfigOption<Integer> FLUSS_ACTION_STATE_TABLE_BUCKETS
=
+ new ConfigOption<>("flussActionStateTableBuckets", Integer.class,
64);
+
+ /**
+ * The config parameter specifies the authentication protocol for Fluss
client. Valid values:
+ * {@code "PLAINTEXT"} (default, no authentication) and {@code "SASL"}
(SASL/PLAIN
+ * authentication). Value is case-insensitive.
+ */
+ public static final ConfigOption<String> FLUSS_SECURITY_PROTOCOL =
+ new ConfigOption<>("flussSecurityProtocol", String.class,
"PLAINTEXT");
+
+ /** The config parameter specifies the SASL mechanism for Fluss
authentication. */
+ public static final ConfigOption<String> FLUSS_SASL_MECHANISM =
+ new ConfigOption<>("flussSaslMechanism", String.class, "PLAIN");
+
+ /**
+ * The config parameter specifies the JAAS configuration string for Fluss
SASL authentication.
+ */
+ public static final ConfigOption<String> FLUSS_SASL_JAAS_CONFIG =
+ new ConfigOption<>("flussSaslJaasConfig", String.class, null);
+
+ /** The config parameter specifies the username for Fluss SASL
authentication. */
+ public static final ConfigOption<String> FLUSS_SASL_USERNAME =
+ new ConfigOption<>("flussSaslUsername", String.class, null);
+
+ /** The config parameter specifies the password for Fluss SASL
authentication. */
+ public static final ConfigOption<String> FLUSS_SASL_PASSWORD =
+ new ConfigOption<>("flussSaslPassword", String.class, null);
+
/** The config parameter specifies the unique identifier of job. */
public static final ConfigOption<String> JOB_IDENTIFIER =
new ConfigOption<>("job-identifier", String.class, null);
diff --git a/docs/content/docs/operations/configuration.md
b/docs/content/docs/operations/configuration.md
index 8a387def..82e08f7a 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -146,14 +146,35 @@ Here is the list of all built-in core configuration
options.
### Action State Store
+#### Common
+
+| Key | Default | Type | Description
|
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `actionStateStoreBackend` | (none) | String | The backend for
action state store. Supported values: `"kafka"`, `"fluss"`. |
+
#### Kafka-based Action State Store
Here are the configuration options for Kafka-based Action State Store.
| Key | Default | Type |
Description |
|-------------------------------------|--------------------------|---------|-----------------------------------------------------------------------------|
-| `actionStateStoreBackend` | (none) | String |
The config parameter specifies the backend for action state store. |
| `kafkaBootstrapServers` | "localhost:9092" | String |
The config parameter specifies the Kafka bootstrap server. |
| `kafkaActionStateTopic` | (none) | String |
The config parameter specifies the Kafka topic for action state. |
| `kafkaActionStateTopicNumPartitions`| 64 | Integer |
The config parameter specifies the number of partitions for the Kafka action
state topic. |
| `kafkaActionStateTopicReplicationFactor` | 1 | Integer |
The config parameter specifies the replication factor for the Kafka action
state topic. |
+
+#### Fluss-based Action State Store
+
+Here are the configuration options for Fluss-based Action State Store.
+
+| Key | Default | Type | Description
|
+|------------------------------|------------------|---------|------------------------------------------------------------------------------------------|
+| `flussBootstrapServers` | "localhost:9123" | String | The Fluss
bootstrap servers address. |
+| `flussActionStateDatabase` | "flink_agents" | String | The Fluss
database name for storing action state. |
+| `flussActionStateTable` | "action_states" | String | The Fluss table
name for storing action state. |
+| `flussActionStateTableBuckets` | 64 | Integer | The number of
buckets for the Fluss action state table. |
+| `flussSecurityProtocol` | "PLAINTEXT" | String | The
authentication protocol for Fluss client. Valid values: `PLAINTEXT` (default,
no authentication), `SASL` (SASL/PLAIN authentication). |
+| `flussSaslMechanism` | "PLAIN" | String | The SASL
mechanism for Fluss authentication.
|
+| `flussSaslJaasConfig` | (none) | String | The JAAS
configuration string for Fluss SASL authentication.
|
+| `flussSaslUsername` | (none) | String | The username for
Fluss SASL authentication. |
+| `flussSaslPassword` | (none) | String | The password for
Fluss SASL authentication. |
diff --git a/docs/content/docs/operations/deployment.md
b/docs/content/docs/operations/deployment.md
index 1758c416..bb3cfb30 100644
--- a/docs/content/docs/operations/deployment.md
+++ b/docs/content/docs/operations/deployment.md
@@ -174,7 +174,7 @@ To ensure exactly-once action consistency, you must
configure an external action
The same persisted action state is also used by fine-grained durable execution.
{{< hint info >}}
-**Note**: Currently, Kafka is supported as the external action state store.
+**Note**: Currently, Kafka and Fluss are supported as the external action
state store.
{{< /hint >}}
See [Action State Store Configuration]({{< ref
"docs/operations/configuration#action-state-store" >}}) for configuration
options.
diff --git a/docs/themes/book b/docs/themes/book
index a486adf8..4018d4b5 160000
--- a/docs/themes/book
+++ b/docs/themes/book
@@ -1 +1 @@
-Subproject commit a486adf8462c0abfc9034436ddd72927d6656809
+Subproject commit 4018d4b51d10427bc7f80da0b0d535a4b4787ba0
diff --git a/pom.xml b/pom.xml
index cd774479..e77b0a51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,6 +43,7 @@ under the License.
<spotless.skip>false</spotless.skip>
<flink.version>2.2.0</flink.version>
<kafka.version>4.0.0</kafka.version>
+ <fluss.version>0.9.0-incubating</fluss.version>
<junit5.version>5.10.1</junit5.version>
<jackson.version>2.18.2</jackson.version>
<pemja.version>0.5.5</pemja.version>
@@ -309,6 +310,7 @@ under the License.
<configuration>
<argLine>
--add-opens=java.base/java.util=ALL-UNNAMED
+ --add-opens=java.base/java.nio=ALL-UNNAMED
--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED
</argLine>
</configuration>
diff --git a/runtime/pom.xml b/runtime/pom.xml
index 168fb2be..422c5c32 100644
--- a/runtime/pom.xml
+++ b/runtime/pom.xml
@@ -122,6 +122,44 @@ under the License.
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
+ <!-- fluss client -->
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-client</artifactId>
+ <version>${fluss.version}</version>
+ </dependency>
+ <!-- fluss test infrastructure -->
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-server</artifactId>
+ <version>${fluss.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-server</artifactId>
+ <version>${fluss.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.fluss</groupId>
+ <artifactId>fluss-test-utils</artifactId>
+ <version>${fluss.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>5.4.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- LOG -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java
deleted file mode 100644
index 38e7b6ad..00000000
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaDeserializer.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.agents.runtime.actionstate;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.flink.agents.api.Event;
-import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.OutputEvent;
-import org.apache.flink.agents.runtime.operator.ActionTask;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Kafka deserializer for {@link ActionState}.
- *
- * <p>This deserializer handles the deserialization of byte arrays from Kafka
back to ActionState
- * instances. It uses Jackson ObjectMapper with custom deserializers to handle
polymorphic Event
- * types and ensures ActionTask is deserialized as null.
- */
-public class ActionStateKafkaDeserializer implements Deserializer<ActionState>
{
-
- private static final Logger LOG =
LoggerFactory.getLogger(ActionStateKafkaDeserializer.class);
- private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
-
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // No configuration needed
- }
-
- @Override
- public ActionState deserialize(String topic, byte[] data) {
- if (data == null) {
- return null;
- }
-
- try {
- return OBJECT_MAPPER.readValue(data, ActionState.class);
- } catch (Exception e) {
- LOG.error("Failed to deserialize ActionState for topic: {}",
topic, e);
- throw new RuntimeException("Failed to deserialize ActionState", e);
- }
- }
-
- @Override
- public void close() {
- // No resources to close
- }
-
- /** Creates and configures the ObjectMapper for ActionState
deserialization. */
- private static ObjectMapper createObjectMapper() {
- ObjectMapper mapper = new ObjectMapper();
-
- // Add type information for polymorphic Event deserialization
- mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
- mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
- mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
-
- // Create a module for custom deserializers
- SimpleModule module = new SimpleModule();
-
- // Custom deserializer for ActionTask - always deserialize as null
- module.addDeserializer(ActionTask.class, new ActionTaskDeserializer());
-
- mapper.registerModule(module);
-
- return mapper;
- }
-
- /** Mixin to add type information for Event hierarchy. */
- @JsonTypeInfo(
- use = JsonTypeInfo.Id.CLASS,
- include = JsonTypeInfo.As.PROPERTY,
- property = "@class")
- public abstract static class EventTypeInfoMixin {}
-
- /** Custom deserializer for ActionTask that always deserializes as null. */
- public static class ActionTaskDeserializer extends
JsonDeserializer<ActionTask> {
- @Override
- public ActionTask deserialize(JsonParser p, DeserializationContext
ctxt)
- throws IOException {
- // Skip the value and return null
- p.skipChildren();
- return null;
- }
- }
-}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
index b225eb09..9af3dc3f 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSeder.java
@@ -17,107 +17,34 @@
*/
package org.apache.flink.agents.runtime.actionstate;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import org.apache.flink.agents.api.Event;
-import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.OutputEvent;
-import org.apache.flink.agents.runtime.operator.ActionTask;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Map;
/**
- * Kafka serializer for {@link ActionState}.
- *
- * <p>This serializer handles the serialization of ActionState instances to
byte arrays for storage
- * in Kafka. It uses Jackson ObjectMapper with custom serializers to handle
polymorphic Event types
- * and ensures ActionTask is serialized as null.
+ * Kafka serializer/deserializer for {@link ActionState}. Delegates to {@link
ActionStateSerde} for
+ * the actual serialization logic.
*/
public class ActionStateKafkaSeder implements Serializer<ActionState>,
Deserializer<ActionState> {
- private static final Logger LOG =
LoggerFactory.getLogger(ActionStateKafkaSeder.class);
- private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
-
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// No configuration needed
}
@Override
- public ActionState deserialize(String topic, byte[] data) {
- if (data == null) {
- return null;
- }
-
- try {
- return OBJECT_MAPPER.readValue(data, ActionState.class);
- } catch (Exception e) {
- LOG.error("Failed to deserialize ActionState for topic: {}",
topic, e);
- throw new RuntimeException("Failed to deserialize ActionState", e);
- }
+ public byte[] serialize(String topic, ActionState data) {
+ return data == null ? null : ActionStateSerde.serialize(data);
}
@Override
- public byte[] serialize(String topic, ActionState data) {
- if (data == null) {
- return null;
- }
-
- try {
- return OBJECT_MAPPER.writeValueAsBytes(data);
- } catch (Exception e) {
- LOG.error("Failed to serialize ActionState for topic: {}", topic,
e);
- throw new RuntimeException("Failed to serialize ActionState", e);
- }
+ public ActionState deserialize(String topic, byte[] data) {
+ return data == null ? null : ActionStateSerde.deserialize(data);
}
@Override
public void close() {
// No resources to close
}
-
- /** Creates and configures the ObjectMapper for ActionState serialization.
*/
- private static ObjectMapper createObjectMapper() {
- ObjectMapper mapper = new ObjectMapper();
-
- // Add type information for polymorphic Event deserialization
- mapper.addMixIn(Event.class, EventTypeInfoMixin.class);
- mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
- mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
-
- // Create a module for custom serializers
- SimpleModule module = new SimpleModule();
-
- // Custom serializer for ActionTask - always serialize as null
- module.addSerializer(ActionTask.class, new ActionTaskSerializer());
-
- mapper.registerModule(module);
-
- return mapper;
- }
-
- /** Mixin to add type information for Event hierarchy. */
- @JsonTypeInfo(
- use = JsonTypeInfo.Id.CLASS,
- include = JsonTypeInfo.As.PROPERTY,
- property = "@class")
- public abstract static class EventTypeInfoMixin {}
-
- /** Custom serializer for ActionTask that always serializes as null. */
- public static class ActionTaskSerializer extends
JsonSerializer<ActionTask> {
- @Override
- public void serialize(ActionTask value, JsonGenerator gen,
SerializerProvider serializers)
- throws IOException {
- gen.writeNull();
- }
- }
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java
similarity index 67%
rename from
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java
rename to
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java
index 881c5293..02c9a06b 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateKafkaSerializer.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerde.java
@@ -27,50 +27,40 @@ import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.runtime.operator.ActionTask;
-import org.apache.kafka.common.serialization.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Map;
/**
- * Kafka serializer for {@link ActionState}.
+ * Backend-agnostic serializer/deserializer for {@link ActionState}.
*
- * <p>This serializer handles the serialization of ActionState instances to
byte arrays for storage
- * in Kafka. It uses Jackson ObjectMapper with custom serializers to handle
polymorphic Event types
- * and ensures ActionTask is serialized as null.
+ * <p>Uses Jackson {@link ObjectMapper} configured with polymorphic type
information for the {@link
+ * Event} hierarchy and a custom null-serializer for {@link ActionTask}. Both
Kafka and Fluss
+ * ActionStateStore backends delegate to this class for consistent
serialization format.
*/
-public class ActionStateKafkaSerializer implements Serializer<ActionState> {
+public final class ActionStateSerde {
- private static final Logger LOG =
LoggerFactory.getLogger(ActionStateKafkaSerializer.class);
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
- @Override
- public void configure(Map<String, ?> configs, boolean isKey) {
- // No configuration needed
- }
-
- @Override
- public byte[] serialize(String topic, ActionState data) {
- if (data == null) {
- return null;
- }
+ private ActionStateSerde() {}
+ /** Serializes an {@link ActionState} to a JSON byte array. */
+ public static byte[] serialize(ActionState state) {
try {
- return OBJECT_MAPPER.writeValueAsBytes(data);
+ return OBJECT_MAPPER.writeValueAsBytes(state);
} catch (Exception e) {
- LOG.error("Failed to serialize ActionState for topic: {}", topic,
e);
throw new RuntimeException("Failed to serialize ActionState", e);
}
}
- @Override
- public void close() {
- // No resources to close
+ /** Deserializes an {@link ActionState} from a JSON byte array. */
+ public static ActionState deserialize(byte[] data) {
+ try {
+ return OBJECT_MAPPER.readValue(data, ActionState.class);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to deserialize ActionState", e);
+ }
}
- /** Creates and configures the ObjectMapper for ActionState serialization.
*/
private static ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
@@ -79,12 +69,9 @@ public class ActionStateKafkaSerializer implements
Serializer<ActionState> {
mapper.addMixIn(InputEvent.class, EventTypeInfoMixin.class);
mapper.addMixIn(OutputEvent.class, EventTypeInfoMixin.class);
- // Create a module for custom serializers
- SimpleModule module = new SimpleModule();
-
// Custom serializer for ActionTask - always serialize as null
+ SimpleModule module = new SimpleModule();
module.addSerializer(ActionTask.class, new ActionTaskSerializer());
-
mapper.registerModule(module);
return mapper;
@@ -95,10 +82,10 @@ public class ActionStateKafkaSerializer implements
Serializer<ActionState> {
use = JsonTypeInfo.Id.CLASS,
include = JsonTypeInfo.As.PROPERTY,
property = "@class")
- public abstract static class EventTypeInfoMixin {}
+ abstract static class EventTypeInfoMixin {}
/** Custom serializer for ActionTask that always serializes as null. */
- public static class ActionTaskSerializer extends
JsonSerializer<ActionTask> {
+ static class ActionTaskSerializer extends JsonSerializer<ActionTask> {
@Override
public void serialize(ActionTask value, JsonGenerator gen,
SerializerProvider serializers)
throws IOException {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
index 433e2bea..e29557c0 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateStore.java
@@ -26,7 +26,8 @@ import java.util.List;
/** Interface for storing and retrieving the state of actions performed by
agents. */
public interface ActionStateStore extends AutoCloseable {
enum BackendType {
- KAFKA("kafka");
+ KAFKA("kafka"),
+ FLUSS("fluss");
private final String type;
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java
new file mode 100644
index 00000000..b6993281
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStore.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.OffsetSpec;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.log.LogScanner;
+import org.apache.fluss.client.table.scanner.log.ScanRecords;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.DatabaseDescriptor;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.BinaryString;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.LongPredicate;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_JAAS_CONFIG;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_CONFIG;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_PASSWORD;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_JAAS_USERNAME;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SASL_MECHANISM;
+import static org.apache.fluss.config.ConfigOptions.CLIENT_SECURITY_PROTOCOL;
+
+/**
+ * An implementation of {@link ActionStateStore} that uses an Apache Fluss log
table as the backend.
+ * All state is maintained in an in-memory map for fast lookups, with the
Fluss log table providing
+ * durability and recovery support.
+ */
+public class FlussActionStateStore implements ActionStateStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlussActionStateStore.class);
+
+ private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);
+ private static final Duration REBUILD_TIMEOUT = Duration.ofMinutes(5);
+
+ private static final String SECURITY_PROTOCOL_PLAINTEXT = "PLAINTEXT";
+
+ // Column names in the Fluss table schema
+ private static final String COL_NAME_STATE_KEY = "state_key";
+ private static final String COL_NAME_STATE_PAYLOAD = "state_payload";
+ private static final String COL_NAME_AGENT_KEY = "agent_key";
+
+ // Column indices in the Fluss table schema
+ private static final int COL_STATE_KEY = 0;
+ private static final int COL_STATE_PAYLOAD = 1;
+
+ private final AgentConfiguration agentConfiguration;
+ private final String databaseName;
+ private final String tableName;
+ private final TablePath tablePath;
+
+ private final Connection connection;
+ private final Table table;
+ private final AppendWriter writer;
+
+ /** In-memory cache for O(1) state lookups; rebuilt from Fluss log on
recovery. */
+ private final Map<String, ActionState> actionStates;
+
+ @VisibleForTesting
+ FlussActionStateStore(
+ Map<String, ActionState> actionStates,
+ Connection connection,
+ Table table,
+ AppendWriter writer) {
+ this.agentConfiguration = null;
+ this.databaseName = null;
+ this.tableName = null;
+ this.tablePath = null;
+ this.actionStates = actionStates;
+ this.connection = connection;
+ this.table = table;
+ this.writer = writer;
+ }
+
+ public FlussActionStateStore(AgentConfiguration agentConfiguration) {
+ this.agentConfiguration = agentConfiguration;
+ this.databaseName =
agentConfiguration.get(FLUSS_ACTION_STATE_DATABASE);
+ this.tableName =
+ Preconditions.checkNotNull(
+ agentConfiguration.get(FLUSS_ACTION_STATE_TABLE),
+ "flussActionStateTable must be explicitly configured");
+ this.tablePath = TablePath.of(databaseName, tableName);
+ this.actionStates = new HashMap<>();
+
+ Configuration flussConf = new Configuration();
+ flussConf.setString(
+ BOOTSTRAP_SERVERS.key(),
agentConfiguration.get(FLUSS_BOOTSTRAP_SERVERS));
+ // Minimize latency for synchronous put(): setting batch linger time
to zero ensures
+ // that each append is sent immediately without waiting for additional
records to batch.
+ flussConf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT,
Duration.ZERO);
+
+ // Only set security/SASL parameters when the protocol requires
authentication.
+ // When PLAINTEXT (the default), SASL parameters are semantically
invalid and may
+ // cause the Fluss client to attempt an unwanted SASL handshake.
+ String securityProtocol =
agentConfiguration.get(FLUSS_SECURITY_PROTOCOL);
+ flussConf.setString(CLIENT_SECURITY_PROTOCOL, securityProtocol);
+ if (!SECURITY_PROTOCOL_PLAINTEXT.equalsIgnoreCase(securityProtocol)) {
+ flussConf.setString(
+ CLIENT_SASL_MECHANISM,
agentConfiguration.get(FLUSS_SASL_MECHANISM));
+
+ String jaasConfig = agentConfiguration.get(FLUSS_SASL_JAAS_CONFIG);
+ if (jaasConfig != null) {
+ flussConf.setString(CLIENT_SASL_JAAS_CONFIG, jaasConfig);
+ }
+ String username = agentConfiguration.get(FLUSS_SASL_USERNAME);
+ if (username != null) {
+ flussConf.setString(CLIENT_SASL_JAAS_USERNAME, username);
+ }
+ String password = agentConfiguration.get(FLUSS_SASL_PASSWORD);
+ if (password != null) {
+ flussConf.setString(CLIENT_SASL_JAAS_PASSWORD, password);
+ }
+ }
+
+ this.connection = ConnectionFactory.createConnection(flussConf);
+ Table tbl = null;
+ try {
+ maybeCreateDatabaseAndTable();
+ tbl = connection.getTable(tablePath);
+ this.table = tbl;
+ this.writer = table.newAppend().createWriter();
+ } catch (Exception e) {
+ if (tbl != null) {
+ try {
+ tbl.close();
+ } catch (Exception suppressed) {
+ e.addSuppressed(suppressed);
+ }
+ }
+ try {
+ connection.close();
+ } catch (Exception suppressed) {
+ e.addSuppressed(suppressed);
+ }
+ throw new RuntimeException("Failed to initialize
FlussActionStateStore", e);
+ }
+
+ LOG.info(
+ "Initialized FlussActionStateStore (log table) with table:
{}.{}",
+ databaseName,
+ tableName);
+ }
+
+ @Override
+ public void put(Object key, long seqNum, Action action, Event event,
ActionState state)
+ throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ byte[] payload = ActionStateSerde.serialize(state);
+
+ GenericRow row =
+ GenericRow.of(
+ BinaryString.fromString(stateKey),
+ payload,
+ BinaryString.fromString(key.toString()));
+
+ // Synchronous write ensures the record is durable before returning.
+ // TODO: Optimize throughput via batching + flush() once Fluss
supports it
+ // (see
+ //
https://github.com/apache/fluss/blob/5850c837/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java#L234-L241).
+ // Note: steps affecting recovery correctness must remain synchronous.
+ writer.append(row).get();
+ actionStates.put(stateKey, state);
+
+ LOG.debug("Stored action state: key={}, isCompleted={}", stateKey,
state.isCompleted());
+ }
+
+ @Override
+ public ActionState get(Object key, long seqNum, Action action, Event
event) throws Exception {
+ String stateKey = generateKey(key, seqNum, action, event);
+ String keyPrefix = key.toString() + "_";
+
+ boolean hasDivergence = checkDivergence(key.toString(), seqNum);
+
+ if (!actionStates.containsKey(stateKey) || hasDivergence) {
+ removeStateEntries(keyPrefix, stateSeqNum -> stateSeqNum > seqNum);
+ }
+
+ ActionState state = actionStates.get(stateKey);
+ LOG.debug("Lookup action state: key={}, found={}", stateKey, state !=
null);
+ return state;
+ }
+
+ private boolean checkDivergence(String key, long seqNum) {
+ return actionStates.keySet().stream()
+ .filter(k -> k.startsWith(key + "_" + seqNum + "_"))
+ .count()
+ > 1;
+ }
+
+ /**
+ * Removes cached state entries whose key starts with {@code keyPrefix}
and whose parsed
+ * sequence number satisfies {@code seqNumFilter}.
+ */
+ private void removeStateEntries(String keyPrefix, LongPredicate
seqNumFilter) {
+ actionStates
+ .entrySet()
+ .removeIf(
+ entry -> {
+ if (!entry.getKey().startsWith(keyPrefix)) {
+ return false;
+ }
+ try {
+ List<String> parts =
ActionStateUtil.parseKey(entry.getKey());
+ if (parts.size() >= 2) {
+ long stateSeqNum =
Long.parseLong(parts.get(1));
+ return seqNumFilter.test(stateSeqNum);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to parse state key: {}",
entry.getKey(), e);
+ }
+ return false;
+ });
+ }
+
+ /**
+ * Rebuilds in-memory state by scanning the Fluss log table. If recovery
markers are provided,
+ * computes the minimum offset per bucket across all markers and
subscribes from those offsets.
+ * Otherwise, skips rebuild since there is no checkpointed position to
recover from. Reads from
+ * the start offset up to the latest offset captured at rebuild start. For
the same state key
+ * appearing multiple times in the log, the latest record wins
(last-write-wins).
+ */
+ @Override
+ public void rebuildState(List<Object> recoveryMarkers) {
+ LOG.info(
+ "Rebuilding action state from Fluss log table with {} recovery
markers",
+ recoveryMarkers.size());
+
+ if (recoveryMarkers.isEmpty()) {
+ LOG.info("No recovery markers, skipping state rebuild");
+ return;
+ }
+
+ actionStates.clear();
+
+ Map<Integer, Long> bucketStartOffsets =
mergeRecoveryMarkerOffsets(recoveryMarkers);
+ if (bucketStartOffsets.isEmpty()) {
+ LOG.info("No valid bucket offsets in recovery markers, skipping
state rebuild");
+ return;
+ }
+
+ Map<Integer, Long> bucketEndOffsets = getBucketEndOffsets();
+ Map<Integer, Long> bucketEarliestOffsets = getBucketEarliestOffsets();
+ LOG.debug(
+ "Rebuild window: startOffsets={}, earliestOffsets={},
endOffsets={}",
+ bucketStartOffsets,
+ bucketEarliestOffsets,
+ bucketEndOffsets);
+
+ if (!bucketEndOffsets.keySet().equals(bucketStartOffsets.keySet())) {
+ throw new IllegalStateException(
+ String.format(
+ "Recovery marker bucket set %s does not match the
current table "
+ + "bucket set %s. The table may have been
recreated or the "
+ + "bucket count changed since the last
checkpoint. "
+ + "A fresh start without recovery markers
is required.",
+ bucketStartOffsets.keySet(),
bucketEndOffsets.keySet()));
+ }
+
+ try (LogScanner scanner = table.newScan().createLogScanner()) {
+ Map<Integer, Long> remainingBuckets =
+ subscribeEffectiveOffsets(
+ scanner, bucketStartOffsets, bucketEndOffsets,
bucketEarliestOffsets);
+ LOG.debug("Subscribed buckets for rebuild: {}", remainingBuckets);
+
+ pollAndReplay(scanner, remainingBuckets);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to rebuild state from Fluss log
table", e);
+ }
+
+ LOG.info("Completed rebuilding state, recovered {} states",
actionStates.size());
+ }
+
+ /**
+ * Merges recovery markers into a per-bucket start offset map. For each
bucket, the minimum
+ * offset across all markers is used to cover the widest recovery window.
+ */
+ private Map<Integer, Long> mergeRecoveryMarkerOffsets(List<Object>
recoveryMarkers) {
+ Map<Integer, Long> bucketStartOffsets = new HashMap<>();
+ for (Object marker : recoveryMarkers) {
+ if (marker instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
+ for (Map.Entry<Integer, Long> entry : markerMap.entrySet()) {
+ bucketStartOffsets.merge(entry.getKey(), entry.getValue(),
Math::min);
+ }
+ } else if (marker != null) {
+ LOG.warn(
+ "Ignoring unrecognized recovery marker type: {}",
+ marker.getClass().getName());
+ }
+ }
+ return bucketStartOffsets;
+ }
+
+ /**
+ * Validates effective offsets for each bucket and subscribes the scanner.
Buckets with no new
+ * data are skipped; buckets with data loss (retention cleaned the
recovery window) cause an
+ * immediate failure.
+ *
+ * @return a map of bucket-id to end-offset for buckets that need to be
scanned
+ */
+ private Map<Integer, Long> subscribeEffectiveOffsets(
+ LogScanner scanner,
+ Map<Integer, Long> bucketStartOffsets,
+ Map<Integer, Long> bucketEndOffsets,
+ Map<Integer, Long> bucketEarliestOffsets) {
+ Map<Integer, Long> remainingBuckets = new HashMap<>();
+ for (Map.Entry<Integer, Long> entry : bucketStartOffsets.entrySet()) {
+ int bucket = entry.getKey();
+ long startOffset = entry.getValue();
+
+ if (!bucketEndOffsets.containsKey(bucket)
+ || !bucketEarliestOffsets.containsKey(bucket)) {
+ throw new IllegalStateException(
+ String.format(
+ "Recovery marker references bucket %d which
does not exist in "
+ + "the current table (available
buckets: %s). The table "
+ + "may have been recreated or bucket
count changed. "
+ + "A fresh start without recovery
markers is required.",
+ bucket, bucketEndOffsets.keySet()));
+ }
+
+ long endOffset = bucketEndOffsets.get(bucket);
+ long earliestOffset = bucketEarliestOffsets.get(bucket);
+
+ // No new data since checkpoint (includes empty buckets that never
had writes:
+ // endOffset=0, startOffset=0)
+ if (endOffset == startOffset) {
+ LOG.info(
+ "Skipping bucket {} for rebuild: no new data "
+ + "(endOffset={} = startOffset={})",
+ bucket,
+ endOffset,
+ startOffset);
+ continue;
+ }
+
+ // Data loss: retention cleaned the recovery window, or log was
truncated/reset
+ if (earliestOffset > startOffset || endOffset < startOffset) {
+ throw new IllegalStateException(
+ String.format(
+ "Data loss detected for bucket %d: required
data is no longer "
+ + "available in the log
(startOffset=%d, endOffset=%d, "
+ + "earliestOffset=%d). Increase log
retention or reduce "
+ + "checkpoint interval.",
+ bucket, startOffset, endOffset,
earliestOffset));
+ }
+
+ scanner.subscribe(bucket, startOffset);
+ remainingBuckets.put(bucket, endOffset);
+ }
+ return remainingBuckets;
+ }
+
+ /**
+ * Polls the scanner and replays deserialized action states until all
subscribed buckets have
+ * been fully consumed up to their end offsets.
+ */
+ private void pollAndReplay(LogScanner scanner, Map<Integer, Long>
remainingBuckets) {
+ long deadline = System.currentTimeMillis() +
REBUILD_TIMEOUT.toMillis();
+ while (!remainingBuckets.isEmpty()) {
+ if (System.currentTimeMillis() > deadline) {
+ throw new RuntimeException(
+ String.format(
+ "State rebuild timed out after %s. "
+ + "Remaining buckets not fully
consumed: %s",
+ REBUILD_TIMEOUT, remainingBuckets.keySet()));
+ }
+ ScanRecords records = scanner.poll(POLL_TIMEOUT);
+ for (TableBucket bucket : records.buckets()) {
+ Long endOffset = remainingBuckets.get(bucket.getBucket());
+ long lastSeenOffset = replayRecords(records.records(bucket),
endOffset);
+ if (lastSeenOffset + 1 >= endOffset) {
+ remainingBuckets.remove(bucket.getBucket());
+ scanner.unsubscribe(bucket.getBucket());
+ }
+ }
+ }
+ }
+
+ /**
+ * Replays records from a single bucket, deserializing and applying each
action state. Returns
+ * the highest log offset seen (including records past endOffset), used to
detect bucket
+ * completion.
+ */
+ private long replayRecords(Iterable<ScanRecord> records, long endOffset) {
+ long lastSeenOffset = -1;
+ for (ScanRecord record : records) {
+ lastSeenOffset = record.logOffset();
+ if (record.logOffset() >= endOffset) {
+ break;
+ }
+ InternalRow row = record.getRow();
+ String stateKey = row.getString(COL_STATE_KEY).toString();
+ byte[] payload = row.getBytes(COL_STATE_PAYLOAD);
+ ActionState state = ActionStateSerde.deserialize(payload);
+ actionStates.put(stateKey, state);
+ }
+ return lastSeenOffset;
+ }
+
+ private Map<Integer, Long> getBucketEndOffsets() {
+ return getBucketOffsets(new OffsetSpec.LatestSpec());
+ }
+
+ private Map<Integer, Long> getBucketEarliestOffsets() {
+ return getBucketOffsets(new OffsetSpec.EarliestSpec());
+ }
+
+ private Map<Integer, Long> getBucketOffsets(OffsetSpec offsetSpec) {
+ int numBuckets = table.getTableInfo().getNumBuckets();
+ try (Admin admin = connection.getAdmin()) {
+ List<Integer> buckets = new ArrayList<>();
+ for (int b = 0; b < numBuckets; b++) {
+ buckets.add(b);
+ }
+ return admin.listOffsets(tablePath, buckets,
offsetSpec).all().get();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get offsets for Fluss table:
" + tablePath, e);
+ }
+ }
+
+ /**
+ * Returns the end offsets of each bucket as a recovery marker. Similar to
Kafka's
+ * implementation, this captures the current log position so that {@link
#rebuildState} can
+ * resume from these offsets instead of scanning from the beginning.
+ */
+ @Override
+ public Object getRecoveryMarker() {
+ return getBucketEndOffsets();
+ }
+
+ /**
+ * Evicts pruned states from the in-memory cache. The Fluss log is
append-only; physical cleanup
+ * relies on Fluss log retention configuration.
+ */
+ @Override
+ public void pruneState(Object key, long seqNum) {
+ LOG.debug("Pruning in-memory state for key: {} up to seqNum: {}", key,
seqNum);
+ removeStateEntries(key.toString() + "_", stateSeqNum -> stateSeqNum <=
seqNum);
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ private void maybeCreateDatabaseAndTable() {
+ try (Admin admin = connection.getAdmin()) {
+ if (!admin.databaseExists(databaseName).get()) {
+ admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY,
true).get();
+ LOG.info("Created Fluss database: {}", databaseName);
+ }
+
+ if (!admin.tableExists(tablePath).get()) {
+ // No primaryKey() call — this creates an append-only log
table in Fluss.
+ Schema schema =
+ Schema.newBuilder()
+ .column(COL_NAME_STATE_KEY, DataTypes.STRING())
+ .column(COL_NAME_STATE_PAYLOAD,
DataTypes.BYTES())
+ .column(COL_NAME_AGENT_KEY, DataTypes.STRING())
+ .build();
+
+ int buckets =
agentConfiguration.get(FLUSS_ACTION_STATE_TABLE_BUCKETS);
+ TableDescriptor descriptor =
+ TableDescriptor.builder()
+ .schema(schema)
+ .distributedBy(buckets, COL_NAME_AGENT_KEY)
+ .comment("Flink Agents action state log")
+ .build();
+
+ admin.createTable(tablePath, descriptor, true).get();
+ LOG.info("Created Fluss log table: {}", tablePath);
+ } else {
+ LOG.info("Fluss table {} already exists", tablePath);
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to create or verify Fluss database/table: {}.{}",
+ databaseName,
+ tableName,
+ e);
+ throw new RuntimeException("Failed to create or verify Fluss
database/table", e);
+ }
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
index 648f146b..f09e5cd2 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStore.java
@@ -194,7 +194,9 @@ public class KafkaActionStateStore implements
ActionStateStore {
}
private boolean checkDivergence(String key, long seqNum) {
- return actionStates.keySet().stream().filter(k -> k.startsWith(key +
"_" + seqNum)).count()
+ return actionStates.keySet().stream()
+ .filter(k -> k.startsWith(key + "_" + seqNum + "_"))
+ .count()
> 1;
}
@@ -211,6 +213,7 @@ public class KafkaActionStateStore implements
ActionStateStore {
// Process recovery markers to get the smallest offsets for each
partition
for (Object marker : recoveryMarkers) {
if (marker instanceof Map) {
+ @SuppressWarnings("unchecked")
Map<Integer, Long> markerMap = (Map<Integer, Long>) marker;
for (Map.Entry<Integer, Long> entry :
markerMap.entrySet()) {
Long offset =
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index 16842d03..f8d1129a 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.agents.plan.AgentConfiguration;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
+import org.apache.flink.agents.runtime.actionstate.FlussActionStateStore;
import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore;
import org.apache.flink.agents.runtime.context.ActionStatePersister;
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
@@ -48,6 +49,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.ACTION_STATE_STORE_BACKEND;
+import static
org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.FLUSS;
import static
org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.KAFKA;
/**
@@ -113,10 +115,15 @@ class DurableExecutionManager implements
ActionStatePersister, AutoCloseable {
* @param config the agent configuration carrying the backend selection.
*/
void maybeInitActionStateStore(AgentConfiguration config) {
- if (actionStateStore == null
- &&
KAFKA.getType().equalsIgnoreCase(config.get(ACTION_STATE_STORE_BACKEND))) {
- LOG.info("Using Kafka as backend of action state store.");
- actionStateStore = new KafkaActionStateStore(config);
+ if (actionStateStore == null) {
+ String backend = config.get(ACTION_STATE_STORE_BACKEND);
+ if (KAFKA.getType().equalsIgnoreCase(backend)) {
+ LOG.info("Using Kafka as backend of action state store.");
+ actionStateStore = new KafkaActionStateStore(config);
+ } else if (FLUSS.getType().equalsIgnoreCase(backend)) {
+ LOG.info("Using Fluss as backend of action state store.");
+ actionStateStore = new FlussActionStateStore(config);
+ }
}
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
index ca510332..e0c4de0b 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateSerdeTest.java
@@ -53,16 +53,13 @@ public class ActionStateSerdeTest {
originalState.addShortTermMemoryUpdate(shortTermMemoryUpdate);
originalState.addEvent(outputEvent);
- // Test Kafka seder/deserializer
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
// Serialize
- byte[] serialized = seder.serialize("test-topic", originalState);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
assertNotNull(serialized);
assertTrue(serialized.length > 0);
// Deserialize
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
assertNotNull(deserializedState);
// Verify taskEvent
@@ -102,10 +99,8 @@ public class ActionStateSerdeTest {
originalState.addSensoryMemoryUpdate(memoryUpdate);
// Test serialization/deserialization
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
- byte[] serialized = seder.serialize("test-topic", originalState);
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
// Verify taskEvent is null
assertNull(deserializedState.getTaskEvent());
@@ -128,10 +123,8 @@ public class ActionStateSerdeTest {
ActionState originalState = new ActionState(inputEvent);
// Test serialization/deserialization
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
- byte[] serialized = seder.serialize("test-topic", originalState);
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
// Verify complex attributes are preserved
InputEvent deserializedInputEvent = (InputEvent)
deserializedState.getTaskEvent();
@@ -157,10 +150,8 @@ public class ActionStateSerdeTest {
originalState.addCallResult(result2);
// Test serialization/deserialization
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
- byte[] serialized = seder.serialize("test-topic", originalState);
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
// Verify call results
assertEquals(2, deserializedState.getCallResultCount());
@@ -186,10 +177,8 @@ public class ActionStateSerdeTest {
ActionState originalState = new ActionState(inputEvent);
originalState.addCallResult(CallResult.pending("module.func",
"digest"));
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
- byte[] serialized = seder.serialize("test-topic", originalState);
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
assertEquals(1, deserializedState.getCallResultCount());
CallResult result = deserializedState.getCallResult(0);
@@ -215,10 +204,8 @@ public class ActionStateSerdeTest {
inputEvent, sensoryUpdates, shortTermUpdates,
outputEvents, null, true);
// Test serialization/deserialization
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
- byte[] serialized = seder.serialize("test-topic", originalState);
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
// Verify completed flag
assertTrue(deserializedState.isCompleted());
@@ -242,10 +229,8 @@ public class ActionStateSerdeTest {
new ActionState(inputEvent, null, null, null, callResults,
false);
// Test serialization/deserialization
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
- byte[] serialized = seder.serialize("test-topic", originalState);
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
// Verify state
assertFalse(deserializedState.isCompleted());
@@ -261,10 +246,8 @@ public class ActionStateSerdeTest {
ActionState originalState = new ActionState(inputEvent);
originalState.addCallResult(new CallResult("func", "digest", null,
null));
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
-
- byte[] serialized = seder.serialize("test-topic", originalState);
- ActionState deserializedState = seder.deserialize("test-topic",
serialized);
+ byte[] serialized = ActionStateSerde.serialize(originalState);
+ ActionState deserializedState =
ActionStateSerde.deserialize(serialized);
assertEquals(1, deserializedState.getCallResultCount());
CallResult result = deserializedState.getCallResult(0);
@@ -310,9 +293,8 @@ public class ActionStateSerdeTest {
+ "\"completed\":false"
+ "}";
- ActionStateKafkaSeder seder = new ActionStateKafkaSeder();
ActionState deserializedState =
- seder.deserialize("test-topic",
json.getBytes(StandardCharsets.UTF_8));
+
ActionStateSerde.deserialize(json.getBytes(StandardCharsets.UTF_8));
assertEquals(2, deserializedState.getCallResultCount());
@@ -326,4 +308,29 @@ public class ActionStateSerdeTest {
assertArrayEquals(
"exception".getBytes(StandardCharsets.UTF_8),
legacyFailure.getExceptionPayload());
}
+
+ @Test
+ public void testKafkaSederDelegatesToActionStateSerde() throws Exception {
+ InputEvent inputEvent = new InputEvent("test delegation");
+ ActionState originalState = new ActionState(inputEvent);
+
+ // Serialize via ActionStateSerde, deserialize via Kafka seder (and
vice versa)
+ ActionStateKafkaSeder kafkaSeder = new ActionStateKafkaSeder();
+
+ byte[] serializedBySerde = ActionStateSerde.serialize(originalState);
+ byte[] serializedByKafka = kafkaSeder.serialize("test-topic",
originalState);
+
+ ActionState fromSerde =
ActionStateSerde.deserialize(serializedByKafka);
+ ActionState fromKafka = kafkaSeder.deserialize("test-topic",
serializedBySerde);
+
+ // Both should produce identical results
+ assertArrayEquals(serializedBySerde, serializedByKafka);
+ assertEquals(
+ ((InputEvent) fromSerde.getTaskEvent()).getInput(),
+ ((InputEvent) fromKafka.getTaskEvent()).getInput());
+
+ // Kafka seder should handle nulls
+ assertNull(kafkaSeder.serialize("test-topic", null));
+ assertNull(kafkaSeder.deserialize("test-topic", null));
+ }
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
index a04be38a..2a90c1f1 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtilTest.java
@@ -17,10 +17,7 @@
*/
package org.apache.flink.agents.runtime.actionstate;
-import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.context.RunnerContext;
-import org.apache.flink.agents.plan.JavaFunction;
import org.apache.flink.agents.plan.actions.Action;
import org.junit.jupiter.api.Test;
@@ -38,7 +35,7 @@ public class ActionStateUtilTest {
public void testGenerateKeyConsistency() throws Exception {
// Create test data
Object key = "consistency-test";
- Action action = new TestAction("consistency-action");
+ Action action = new NoOpAction("consistency-action");
InputEvent inputEvent = new InputEvent("same-input");
InputEvent inputEvent2 = new InputEvent("same-input");
@@ -54,7 +51,7 @@ public class ActionStateUtilTest {
public void testGenerateKeyDifferentInputs() throws Exception {
// Create test data
Object key = "diff-test";
- Action action = new TestAction("diff-action");
+ Action action = new NoOpAction("diff-action");
InputEvent inputEvent1 = new InputEvent("input1");
InputEvent inputEvent2 = new InputEvent("input2");
@@ -68,7 +65,7 @@ public class ActionStateUtilTest {
@Test
public void testGenerateKeyWithNullKey() throws Exception {
- Action action = new TestAction("test-action");
+ Action action = new NoOpAction("test-action");
InputEvent inputEvent = new InputEvent("test-input");
assertThrows(
@@ -93,7 +90,7 @@ public class ActionStateUtilTest {
@Test
public void testGenerateKeyWithNullEvent() throws Exception {
Object key = "test-key";
- Action action = new TestAction("test-action");
+ Action action = new NoOpAction("test-action");
assertThrows(
NullPointerException.class,
@@ -106,7 +103,7 @@ public class ActionStateUtilTest {
public void testParseKeyValidKey() throws Exception {
// Create test data and generate a key
Object key = "test-key";
- Action action = new TestAction("test-action");
+ Action action = new NoOpAction("test-action");
InputEvent inputEvent = new InputEvent("test-input");
long seqNum = 123;
@@ -128,7 +125,7 @@ public class ActionStateUtilTest {
public void testParseKeyRoundTrip() throws Exception {
// Test that generate -> parse -> values match original inputs
Object originalKey = "round-trip-test";
- Action action = new TestAction("round-trip-action");
+ Action action = new NoOpAction("round-trip-action");
InputEvent inputEvent = new InputEvent("round-trip-input");
long seqNum = 456;
@@ -176,7 +173,7 @@ public class ActionStateUtilTest {
public void testParseKeyWithSpecialCharacters() throws Exception {
// Test with keys containing special characters (but not the separator)
Object key = "key-with-special@chars#123";
- Action action = new TestAction("action-with-special@chars");
+ Action action = new NoOpAction("action-with-special@chars");
InputEvent inputEvent = new InputEvent("input-with-special@chars");
long seqNum = 789;
@@ -190,7 +187,7 @@ public class ActionStateUtilTest {
@Test
public void testParseKeyConsistencyWithDifferentKeys() throws Exception {
// Generate keys with different inputs and verify parsing consistency
- Action action = new TestAction("consistency-action");
+ Action action = new NoOpAction("consistency-action");
InputEvent inputEvent = new InputEvent("consistency-input");
String key1 = ActionStateUtil.generateKey("key1", 100, action,
inputEvent);
@@ -207,21 +204,4 @@ public class ActionStateUtilTest {
assertEquals(parsed1.get(2), parsed2.get(2)); // Event UUID
assertEquals(parsed1.get(3), parsed2.get(3)); // Action UUID
}
-
- private static class TestAction extends Action {
-
- public static void doNothing(Event event, RunnerContext context) {
- // No operation
- }
-
- public TestAction(String name) throws Exception {
- super(
- name,
- new JavaFunction(
- TestAction.class.getName(),
- "doNothing",
- new Class[] {Event.class, RunnerContext.class}),
- List.of(InputEvent.EVENT_TYPE));
- }
- }
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java
new file mode 100644
index 00000000..0d4ddd06
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreIT.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for {@link FlussActionStateStore} against an embedded
Fluss cluster. */
+public class FlussActionStateStoreIT {
+
+ private static final String TEST_DATABASE = "test_flink_agents";
+ private static final String TEST_TABLE = "action_state_it";
+ private static final String TEST_KEY = "test-key";
+
+ @RegisterExtension
+ static final FlussClusterExtension FLUSS_CLUSTER =
+ FlussClusterExtension.builder().setNumOfTabletServers(1).build();
+
+ private FlussActionStateStore store;
+ private Action testAction;
+ private Event testEvent;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ AgentConfiguration config = createAgentConfiguration();
+ store = new FlussActionStateStore(config);
+
+ // Wait for table to be ready in the cluster
+ waitForTableReady();
+
+ testAction = new NoOpAction("test-action");
+ testEvent = new InputEvent("test-data");
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ // ==================== Basic CRUD ====================
+
+ @Test
+ void testPutAndGet() throws Exception {
+ ActionState state = new ActionState(testEvent);
+
+ store.put(TEST_KEY, 1L, testAction, testEvent, state);
+ ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent);
+
+ assertThat(retrieved).isNotNull();
+ assertThat(retrieved.getTaskEvent()).isEqualTo(testEvent);
+ assertThat(retrieved.isCompleted()).isFalse();
+ }
+
+ @Test
+ void testGetNonExistent() throws Exception {
+ ActionState result = store.get(TEST_KEY, 999L, testAction, testEvent);
+
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testMultipleSeqNums() throws Exception {
+ InputEvent event1 = new InputEvent("data-1");
+ InputEvent event2 = new InputEvent("data-2");
+ InputEvent event3 = new InputEvent("data-3");
+ ActionState state1 = new ActionState(event1);
+ ActionState state2 = new ActionState(event2);
+ ActionState state3 = new ActionState(event3);
+
+ store.put(TEST_KEY, 1L, testAction, testEvent, state1);
+ store.put(TEST_KEY, 2L, testAction, testEvent, state2);
+ store.put(TEST_KEY, 3L, testAction, testEvent, state3);
+
+ assertThat(store.get(TEST_KEY, 1L, testAction,
testEvent).getTaskEvent()).isEqualTo(event1);
+ assertThat(store.get(TEST_KEY, 2L, testAction,
testEvent).getTaskEvent()).isEqualTo(event2);
+ assertThat(store.get(TEST_KEY, 3L, testAction,
testEvent).getTaskEvent()).isEqualTo(event3);
+ }
+
+ @Test
+ void testUpsertOverwrite() throws Exception {
+ ActionState original = new ActionState(new InputEvent("original"));
+ store.put(TEST_KEY, 1L, testAction, testEvent, original);
+
+ InputEvent updatedEvent = new InputEvent("updated");
+ ActionState updated = new ActionState(updatedEvent);
+ store.put(TEST_KEY, 1L, testAction, testEvent, updated);
+
+ ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent);
+ assertThat(retrieved).isNotNull();
+ assertThat(retrieved.getTaskEvent()).isEqualTo(updatedEvent);
+ }
+
+ // ==================== Pruning ====================
+
+ @Test
+ void testPruneSingleKey() throws Exception {
+ store.put(TEST_KEY, 1L, testAction, testEvent, new
ActionState(testEvent));
+ store.put(TEST_KEY, 2L, testAction, testEvent, new
ActionState(testEvent));
+ store.put(TEST_KEY, 3L, testAction, testEvent, new
ActionState(testEvent));
+
+ store.pruneState(TEST_KEY, 2L);
+
+ // pruneState is synchronous (in-memory eviction)
+ // Check surviving entry first: get() with a missing key triggers
divergence cleanup
+ // that removes entries with higher seqNums (same as Kafka backend
behavior).
+ assertThat(store.get(TEST_KEY, 3L, testAction, testEvent)).isNotNull();
+ assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNull();
+ assertThat(store.get(TEST_KEY, 2L, testAction, testEvent)).isNull();
+ }
+
+ // ==================== Recovery ====================
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testRecoveryMarkerReturnsBucketOffsets() {
+ Object marker = store.getRecoveryMarker();
+ assertThat(marker).isNotNull();
+ assertThat(marker).isInstanceOf(Map.class);
+ Map<Integer, Long> bucketOffsets = (Map<Integer, Long>) marker;
+ // With 1 bucket configured, we should have exactly 1 entry
+ assertThat(bucketOffsets).hasSize(1);
+ assertThat(bucketOffsets).containsKey(0);
+ assertThat(bucketOffsets.get(0)).isGreaterThanOrEqualTo(0L);
+ }
+
+ @Test
+ void testRebuildStateWithEmptyMarkersSkipsRebuild() throws Exception {
+ store.put(TEST_KEY, 1L, testAction, testEvent, new
ActionState(testEvent));
+
+ // rebuildState with empty markers should skip rebuild (aligned with
Kafka backend)
+ store.rebuildState(Collections.emptyList());
+
+ // The in-memory cache is not cleared when rebuild is skipped,
+ // so the state should still be accessible
+ assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNotNull();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testRebuildStateWithRecoveryMarkers() throws Exception {
+ store.put(TEST_KEY, 1L, testAction, testEvent, new
ActionState(testEvent));
+
+ // Capture recovery marker after writing data (simulates checkpoint
boundary)
+ Object marker = store.getRecoveryMarker();
+
+ // Write more data after the marker (simulates writes between
checkpoint and crash)
+ store.put(TEST_KEY, 2L, testAction, testEvent, new
ActionState(testEvent));
+
+ // Close to ensure all writes are fully committed before recovery
+ store.close();
+
+ // Simulate recovery: new store instance
+ FlussActionStateStore recoveredStore =
+ new FlussActionStateStore(createAgentConfiguration());
+ try {
+ // Rebuild using the marker; should replay from marker offset to
current end
+ recoveredStore.rebuildState(List.of(marker));
+
+ // Check surviving entry first: get() with a missing key triggers
divergence cleanup
+ // that removes entries with higher seqNums (same as Kafka backend
behavior).
+ // Data written after the marker should be recovered
+ assertThat(recoveredStore.get(TEST_KEY, 2L, testAction,
testEvent)).isNotNull();
+ // Data written before the marker should NOT be in the rebuilt
cache
+ assertThat(recoveredStore.get(TEST_KEY, 1L, testAction,
testEvent)).isNull();
+ } finally {
+ recoveredStore.close();
+ // Prevent double-close in tearDown
+ store = null;
+ }
+ }
+
+ @Test
+ void testPruneWorksAfterRecovery() throws Exception {
+ // Capture recovery marker BEFORE writing data.
+ Object marker = store.getRecoveryMarker();
+
+ store.put(TEST_KEY, 1L, testAction, testEvent, new
ActionState(testEvent));
+ store.put(TEST_KEY, 2L, testAction, testEvent, new
ActionState(testEvent));
+ store.put(TEST_KEY, 3L, testAction, testEvent, new
ActionState(testEvent));
+ store.close();
+
+ // Simulate recovery: new store instance
+ FlussActionStateStore recoveredStore =
+ new FlussActionStateStore(createAgentConfiguration());
+ try {
+ // Rebuild state from the log using recovery markers
+ recoveredStore.rebuildState(List.of(marker));
+
+ assertThat(recoveredStore.get(TEST_KEY, 1L, testAction,
testEvent)).isNotNull();
+ assertThat(recoveredStore.get(TEST_KEY, 2L, testAction,
testEvent)).isNotNull();
+ assertThat(recoveredStore.get(TEST_KEY, 3L, testAction,
testEvent)).isNotNull();
+
+ recoveredStore.pruneState(TEST_KEY, 2L);
+
+ // Check surviving entry first (get() divergence cleanup
side-effect)
+ assertThat(recoveredStore.get(TEST_KEY, 3L, testAction,
testEvent)).isNotNull();
+ assertThat(recoveredStore.get(TEST_KEY, 1L, testAction,
testEvent)).isNull();
+ assertThat(recoveredStore.get(TEST_KEY, 2L, testAction,
testEvent)).isNull();
+ } finally {
+ recoveredStore.close();
+ }
+ }
+
+ // ==================== Multi-bucket ====================
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void testMultiBucketRecovery() throws Exception {
+ // Use a separate database/table with 4 buckets to test multi-bucket
scenario
+ String multiDb = "test_flink_agents_multi";
+ String multiTable = "action_state_multi";
+ AgentConfiguration multiConfig = createAgentConfiguration(multiDb,
multiTable, 4);
+ FlussActionStateStore multiStore = new
FlussActionStateStore(multiConfig);
+ try {
+ waitForTableReady(multiDb, multiTable);
+
+ // Write states with different keys (likely distributed across
buckets)
+ Action action1 = new NoOpAction("multi-action");
+ for (int i = 0; i < 10; i++) {
+ String key = "multi-key-" + i;
+ multiStore.put(key, 1L, action1, testEvent, new
ActionState(testEvent));
+ }
+
+ // Verify all states are retrievable
+ for (int i = 0; i < 10; i++) {
+ String key = "multi-key-" + i;
+ assertThat(multiStore.get(key, 1L, action1,
testEvent)).isNotNull();
+ }
+
+ // Recovery marker should contain all 4 buckets
+ Object marker = multiStore.getRecoveryMarker();
+ assertThat(marker).isInstanceOf(Map.class);
+ Map<Integer, Long> bucketOffsets = (Map<Integer, Long>) marker;
+ assertThat(bucketOffsets).hasSize(4);
+
+ // Write more data after marker
+ for (int i = 10; i < 15; i++) {
+ String key = "multi-key-" + i;
+ multiStore.put(key, 1L, action1, testEvent, new
ActionState(testEvent));
+ }
+ multiStore.close();
+
+ // Recover into a new store instance
+ FlussActionStateStore recoveredStore = new
FlussActionStateStore(multiConfig);
+ try {
+ recoveredStore.rebuildState(List.of(marker));
+
+ // Data written after marker should be recovered
+ for (int i = 10; i < 15; i++) {
+ String key = "multi-key-" + i;
+ assertThat(recoveredStore.get(key, 1L, action1,
testEvent)).isNotNull();
+ }
+ } finally {
+ recoveredStore.close();
+ }
+ } finally {
+ multiStore.close();
+ // Prevent double-close in tearDown
+ store = null;
+ }
+ }
+
+ // ==================== Helpers ====================
+
+ private AgentConfiguration createAgentConfiguration() {
+ return createAgentConfiguration(TEST_DATABASE, TEST_TABLE, 1);
+ }
+
+ private AgentConfiguration createAgentConfiguration(
+ String database, String table, int buckets) {
+ AgentConfiguration config = new AgentConfiguration();
+ config.set(FLUSS_BOOTSTRAP_SERVERS,
FLUSS_CLUSTER.getBootstrapServers());
+ config.set(FLUSS_ACTION_STATE_DATABASE, database);
+ config.set(FLUSS_ACTION_STATE_TABLE, table);
+ config.set(FLUSS_ACTION_STATE_TABLE_BUCKETS, buckets);
+ return config;
+ }
+
+ private void waitForTableReady() throws Exception {
+ waitForTableReady(TEST_DATABASE, TEST_TABLE);
+ }
+
+ private void waitForTableReady(String database, String table) throws
Exception {
+ TablePath tablePath = TablePath.of(database, table);
+ try (org.apache.fluss.client.Connection conn =
+
org.apache.fluss.client.ConnectionFactory.createConnection(
+ FLUSS_CLUSTER.getClientConfig());
+ Admin admin = conn.getAdmin()) {
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+ FLUSS_CLUSTER.waitUntilTableReady(tableInfo.getTableId());
+ }
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java
new file mode 100644
index 00000000..547a83fb
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreSaslIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_DATABASE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_ACTION_STATE_TABLE_BUCKETS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_MECHANISM;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_PASSWORD;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SASL_USERNAME;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.FLUSS_SECURITY_PROTOCOL;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests for {@link FlussActionStateStore} with SASL/PLAIN
authentication against an
+ * embedded Fluss cluster.
+ */
+public class FlussActionStateStoreSaslIT {
+
+ private static final String TEST_DATABASE = "test_flink_agents_sasl";
+ private static final String TEST_TABLE = "action_state_sasl_it";
+ private static final String TEST_KEY = "test-key";
+ private static final String SASL_USERNAME = "testuser";
+ private static final String SASL_PASSWORD = "testpass";
+
+ @RegisterExtension
+ static final FlussClusterExtension FLUSS_CLUSTER =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(1)
+ .setCoordinatorServerListeners("FLUSS://localhost:0,
CLIENT://localhost:0")
+ .setTabletServerListeners("FLUSS://localhost:0,
CLIENT://localhost:0")
+ .setClusterConf(createSaslClusterConfig())
+ .build();
+
+ private FlussActionStateStore store;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ AgentConfiguration config = createSaslAgentConfiguration();
+ store = new FlussActionStateStore(config);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ @Test
+ void testPutAndGetWithSaslAuth() throws Exception {
+ Action testAction = new NoOpAction("sasl-action");
+ InputEvent testEvent = new InputEvent("sasl-data");
+ ActionState state = new ActionState(testEvent);
+
+ store.put(TEST_KEY, 1L, testAction, testEvent, state);
+ ActionState retrieved = store.get(TEST_KEY, 1L, testAction, testEvent);
+
+ assertThat(retrieved).isNotNull();
+ assertThat(retrieved.getTaskEvent()).isEqualTo(testEvent);
+ }
+
+ @Test
+ void testRecoveryWithSaslAuth() throws Exception {
+ Action testAction = new NoOpAction("sasl-recovery-action");
+ InputEvent testEvent = new InputEvent("sasl-recovery-data");
+
+ // Write data and capture recovery marker
+ store.put(TEST_KEY, 1L, testAction, testEvent, new
ActionState(testEvent));
+ Object marker = store.getRecoveryMarker();
+
+ // Write more data after marker
+ store.put(TEST_KEY, 2L, testAction, testEvent, new
ActionState(testEvent));
+ store.close();
+
+ // Recover into a new store instance with SASL
+ FlussActionStateStore recoveredStore =
+ new FlussActionStateStore(createSaslAgentConfiguration());
+ try {
+ recoveredStore.rebuildState(java.util.List.of(marker));
+
+ // Data written after marker should be recovered
+ assertThat(recoveredStore.get(TEST_KEY, 2L, testAction,
testEvent)).isNotNull();
+ // Data written before marker should NOT be in the rebuilt cache
+ assertThat(recoveredStore.get(TEST_KEY, 1L, testAction,
testEvent)).isNull();
+ } finally {
+ recoveredStore.close();
+ store = null;
+ }
+ }
+
+ @Test
+ void testMultipleWritesWithSaslAuth() throws Exception {
+ Action testAction = new NoOpAction("sasl-multi-action");
+ InputEvent testEvent = new InputEvent("sasl-multi-data");
+
+ for (int i = 0; i < 5; i++) {
+ store.put("key-" + i, (long) i, testAction, testEvent, new
ActionState(testEvent));
+ }
+
+ for (int i = 0; i < 5; i++) {
+ assertThat(store.get("key-" + i, (long) i, testAction,
testEvent)).isNotNull();
+ }
+ }
+
+ // ==================== Helpers ====================
+
+ private static Configuration createSaslClusterConfig() {
+ Configuration conf = new Configuration();
+ conf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(),
"CLIENT:sasl");
+ conf.setString("security.sasl.enabled.mechanisms", "plain");
+ conf.setString(
+ "security.sasl.plain.jaas.config",
+ "org.apache.fluss.security.auth.sasl.plain.PlainLoginModule
required "
+ + " user_"
+ + SASL_USERNAME
+ + "=\""
+ + SASL_PASSWORD
+ + "\";");
+ return conf;
+ }
+
+ private AgentConfiguration createSaslAgentConfiguration() {
+ AgentConfiguration config = new AgentConfiguration();
+ String bootstrapServers =
+ String.join(
+ ",",
+ FLUSS_CLUSTER
+ .getClientConfig("CLIENT")
+ .get(ConfigOptions.BOOTSTRAP_SERVERS));
+ config.set(FLUSS_BOOTSTRAP_SERVERS, bootstrapServers);
+ config.set(FLUSS_ACTION_STATE_DATABASE, TEST_DATABASE);
+ config.set(FLUSS_ACTION_STATE_TABLE, TEST_TABLE);
+ config.set(FLUSS_ACTION_STATE_TABLE_BUCKETS, 1);
+ config.set(FLUSS_SECURITY_PROTOCOL, "SASL");
+ config.set(FLUSS_SASL_MECHANISM, "PLAIN");
+ config.set(FLUSS_SASL_USERNAME, SASL_USERNAME);
+ config.set(FLUSS_SASL_PASSWORD, SASL_PASSWORD);
+ return config;
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java
new file mode 100644
index 00000000..308ad3d4
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/FlussActionStateStoreTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.actions.Action;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.row.InternalRow;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/** Unit tests for {@link FlussActionStateStore} store-level behavior. */
+public class FlussActionStateStoreTest {
+
+ private static final String TEST_KEY = "test-key";
+
+ private AppendWriter mockWriter;
+ private FlussActionStateStore store;
+ private Action testAction;
+ private Event testEvent;
+ private ActionState testActionState;
+ private Map<String, ActionState> actionStates;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mockWriter = mock(AppendWriter.class);
+ when(mockWriter.append(any(InternalRow.class)))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ actionStates = new HashMap<>();
+ store =
+ new FlussActionStateStore(
+ actionStates, mock(Connection.class),
mock(Table.class), mockWriter);
+
+ testAction = new NoOpAction("test-action");
+ testEvent = new InputEvent("test data");
+ testActionState = new ActionState(testEvent);
+ }
+
+ @Test
+ void testPutActionState() throws Exception {
+ store.put(TEST_KEY, 1L, testAction, testEvent, testActionState);
+
+ verify(mockWriter).append(any(InternalRow.class));
+
+ String stateKey = ActionStateUtil.generateKey(TEST_KEY, 1L,
testAction, testEvent);
+ assertThat(actionStates).containsKey(stateKey);
+ assertThat(actionStates.get(stateKey)).isEqualTo(testActionState);
+ }
+
+ @Test
+ void testPutActionStateWriterFailure() throws Exception {
+ when(mockWriter.append(any(InternalRow.class)))
+ .thenReturn(CompletableFuture.failedFuture(new
IOException("connection lost")));
+
+ FlussActionStateStore failStore =
+ new FlussActionStateStore(
+ actionStates, mock(Connection.class),
mock(Table.class), mockWriter);
+
+ assertThatThrownBy(
+ () -> failStore.put(TEST_KEY, 1L, testAction,
testEvent, testActionState))
+ .isInstanceOf(Exception.class);
+
+ // Cache should NOT be updated on write failure
+ String stateKey = ActionStateUtil.generateKey(TEST_KEY, 1L,
testAction, testEvent);
+ assertThat(actionStates).doesNotContainKey(stateKey);
+ }
+
+ @Test
+ void testGetTriggersDivergenceCleanup() throws Exception {
+ actionStates.put(
+ ActionStateUtil.generateKey(TEST_KEY, 1L, testAction,
testEvent), testActionState);
+ actionStates.put(
+ ActionStateUtil.generateKey(TEST_KEY, 2L, testAction,
testEvent), testActionState);
+ // diverge: same key+seqNum, different action
+ actionStates.put(
+ ActionStateUtil.generateKey(TEST_KEY, 2L, new
NoOpAction("test-2"), testEvent),
+ testActionState);
+ actionStates.put(
+ ActionStateUtil.generateKey(TEST_KEY, 3L, testAction,
testEvent), testActionState);
+
+ store.get(TEST_KEY, 2L, new NoOpAction("test-1"), testEvent);
+
+ // Divergence detected at seqNum 2 → removeIf clears seqNum > 2
+ assertThat(store.get(TEST_KEY, 1L, testAction, testEvent)).isNotNull();
+ assertThat(store.get(TEST_KEY, 2L, testAction, testEvent)).isNotNull();
+ assertThat(store.get(TEST_KEY, 3L, testAction, testEvent)).isNull();
+ }
+
+ // ==================== rebuildState tests ====================
+
+ @Test
+ void testRebuildStateSkipsOnEmptyMarkers() throws Exception {
+ actionStates.put(
+ ActionStateUtil.generateKey(TEST_KEY, 1L, testAction,
testEvent), testActionState);
+
+ store.rebuildState(Collections.emptyList());
+
+ // Empty markers → rebuild is skipped, cache is NOT cleared
+ assertThat(actionStates).isNotEmpty();
+ }
+
+ @Test
+ void testRebuildStateSkipsOnNonMapMarker() throws Exception {
+ actionStates.put(
+ ActionStateUtil.generateKey(TEST_KEY, 1L, testAction,
testEvent), testActionState);
+
+ // A non-Map marker is ignored, resulting in empty bucketStartOffsets.
+ // Note: rebuildState clears the cache before checking offsets,
+ // so the cache will be empty even though no actual rebuild occurs.
+ store.rebuildState(List.of("invalid-marker"));
+
+ assertThat(actionStates).isEmpty();
+ }
+
+ @Test
+ void testRebuildStateSkipsOnEmptyBucketOffsets() throws Exception {
+ actionStates.put(
+ ActionStateUtil.generateKey(TEST_KEY, 1L, testAction,
testEvent), testActionState);
+
+ // Empty map marker → no valid bucket offsets.
+ // Same as above: cache is cleared before the early-return check.
+ store.rebuildState(List.of(Map.of()));
+
+ assertThat(actionStates).isEmpty();
+ }
+
+ @Test
+ void testCloseClosesResources() throws Exception {
+ Table mockTable = mock(Table.class);
+ Connection mockConnection = mock(Connection.class);
+
+ FlussActionStateStore closeableStore =
+ new FlussActionStateStore(actionStates, mockConnection,
mockTable, mockWriter);
+
+ closeableStore.close();
+
+ verify(mockTable).close();
+ verify(mockConnection).close();
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
index 32d62f77..7cf829c0 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/KafkaActionStateStoreTest.java
@@ -19,9 +19,7 @@ package org.apache.flink.agents.runtime.actionstate;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.InputEvent;
-import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.plan.AgentConfiguration;
-import org.apache.flink.agents.plan.JavaFunction;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
@@ -76,7 +74,7 @@ public class KafkaActionStateStoreTest {
TEST_TOPIC);
// Create test objects
- testAction = new TestAction("test-action");
+ testAction = new NoOpAction("test-action");
testEvent = new InputEvent("test data");
testActionState = new ActionState(testEvent);
}
@@ -107,7 +105,7 @@ public class KafkaActionStateStoreTest {
actionStates.put(
ActionStateUtil.generateKey(TEST_KEY, 4L, testAction,
testEvent), testActionState);
- actionStateStore.get(TEST_KEY, 2L, new TestAction("test-1"),
testEvent);
+ actionStateStore.get(TEST_KEY, 2L, new NoOpAction("test-1"),
testEvent);
assertNotNull(actionStateStore.get(TEST_KEY, 1L, testAction,
testEvent));
assertNotNull(actionStateStore.get(TEST_KEY, 2L, testAction,
testEvent));
@@ -123,7 +121,7 @@ public class KafkaActionStateStoreTest {
ActionStateUtil.generateKey(TEST_KEY, 2L, testAction,
testEvent), testActionState);
// diverge here
actionStates.put(
- ActionStateUtil.generateKey(TEST_KEY, 2L, new
TestAction("test-2"), testEvent),
+ ActionStateUtil.generateKey(TEST_KEY, 2L, new
NoOpAction("test-2"), testEvent),
testActionState);
actionStates.put(
ActionStateUtil.generateKey(TEST_KEY, 3L, testAction,
testEvent), testActionState);
@@ -256,21 +254,4 @@ public class KafkaActionStateStoreTest {
ActionStateUtil.generateKey(TEST_KEY, 3L,
testAction, testEvent)))
.isEqualTo(thirdState);
}
-
- private static class TestAction extends Action {
-
- public static void doNothing(Event event, RunnerContext context) {
- // No operation
- }
-
- public TestAction(String name) throws Exception {
- super(
- name,
- new JavaFunction(
- TestAction.class.getName(),
- "doNothing",
- new Class[] {Event.class, RunnerContext.class}),
- List.of(InputEvent.EVENT_TYPE));
- }
- }
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java
new file mode 100644
index 00000000..cfc1f5d2
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/actionstate/NoOpAction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.plan.JavaFunction;
+import org.apache.flink.agents.plan.actions.Action;
+
+import java.util.List;
+
+/** A no-op {@link Action} for use in action state store tests. */
+public class NoOpAction extends Action {
+
+ public static void doNothing(Event event, RunnerContext context) {
+ // No operation
+ }
+
+ public NoOpAction(String name) throws Exception {
+ super(
+ name,
+ new JavaFunction(
+ NoOpAction.class.getName(),
+ "doNothing",
+ new Class[] {Event.class, RunnerContext.class}),
+ List.of(InputEvent.class.getName()));
+ }
+}