codenohup commented on code in PR #80:
URL: https://github.com/apache/flink-agents/pull/80#discussion_r2241466900


##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java:
##########
@@ -116,7 +137,23 @@ public void open() throws Exception {
                         TypeInformation.of(String.class),
                         TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
         shortTermMemState = 
getRuntimeContext().getMapState(shortTermMemStateDescriptor);
-        runnerContext = new RunnerContextImpl(shortTermMemState, 
this::checkMailboxThread);
+
+        // init agent processing related state
+        actionTasksState =
+                getRuntimeContext()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "actionTasks", 
TypeInformation.of(ActionTask.class)));
+        pendingInputEventsState =
+                getRuntimeContext()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "pendingInputEvents", 
TypeInformation.of(Event.class)));
+        currentProcessingKeysOperatorState =
+                getOperatorStateBackend()
+                        .getListState(
+                                new ListStateDescriptor<>(
+                                        "currentProcessingKeys", 
TypeInformation.of(Object.class)));

Review Comment:
   > 2. How do we handle the redistribution of the operator state?
   
   I suggest replacing _currentProcessingKeysOpState_ to _UnionList_. This 
change would ensure that all keys are retained even after a parallelism 
modification or restart. After that, we can simply iterate through the list and 
clean up any keys that fall outside the current key range.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to