Sxnan commented on code in PR #138:
URL: https://github.com/apache/flink-agents/pull/138#discussion_r2352319740


##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.Action;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/** Utility class for action state related operations. */
+public class ActionStateUtil {
+
+    public static String generateKey(Object key, Action action, Event event) {
+        return key
+                + "-"
+                + UUID.nameUUIDFromBytes(

Review Comment:
   I have an offline discussion with @xintongsong about the key of the event. 
Here is what we have in consensus:
   
   We can only ensure per‑action state consistency on a best‑effort basis. Here 
are the limitations to ensure per-action state consistency:
   1. The order and the content of input events for each key should be 
consistent before and after recovery.
   2. One action cannot depend on the state of another action if they can be 
triggered by the same event. 
   
   Here is what the implementation should look like:
   1. Only the InputEvent and PythonEvent with 
`flink_agents.api.events.event.InputEvent` as type name need a consistent ID.
   2. The ID of those events should be computed by both the content of the 
event and a per-key sequence number. 
   3. Whenever we cannot find an action state for a given action state key, 
which means that the input event arrived in a different order after recovery, 
we stop looking up the action state for the input event with the same key 
afterward. 
   
   @xintongsong let me know if I missed something



##########
api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java:
##########
@@ -23,4 +23,8 @@ public class AgentConfigOptions {
     /** The config parameter specifies the directory for the FileEvent file. */
     public static final ConfigOption<String> BASE_LOG_DIR =
             new ConfigOption<>("baseLogDir", String.class, null);
+
+    /** The config parameter specifies the backend for action state store. */
+    public static final ConfigOption<String> ACTION_STATE_STORE_BACKEND =
+            new ConfigOption<>("actionStateStoreBackend", String.class, 
"inmemory");

Review Comment:
   The default value may be null, which disables the action state altogether.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/InMemoryActionStateStore.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.actionstate;
+
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.plan.Action;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.agents.runtime.actionstate.ActionStateUtil.generateKey;
+
+public class InMemoryActionStateStore implements ActionStateStore {

Review Comment:
   If this cannot be used in any way for production, how about putting it to 
the test code.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -392,6 +440,29 @@ public void close() throws Exception {
         super.close();
     }
 
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        if (actionStateStore != null) {
+            Object recoveryMarker = actionStateStore.getRecoveryMarker();

Review Comment:
   IIUC, there should be a place where we pass the recovery marker to the 
action state store so that it knows where to replay from. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to