This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a4c858b [api] Defensive-copy collection inputs in user-constructed 
events + add contract test (#677)
2a4c858b is described below

commit 2a4c858bf51c1b293c417a46db2362f2a8f32d2d
Author: Avichay Marciano <[email protected]>
AuthorDate: Sat May 16 15:55:05 2026 +0300

    [api] Defensive-copy collection inputs in user-constructed events + add 
contract test (#677)
    
    Wrap the List arg in new ArrayList<>() in the constructors of the events
    that user code instantiates directly: ChatRequestEvent (messages) and
    ContextRetrievalResponseEvent (documents). This avoids the JDK 17+ Kryo
    InaccessibleObjectException when callers pass List.of(...) or any other
    ImmutableCollections instance.
    
    ToolRequestEvent / ToolResponseEvent are intentionally not changed -
    they are produced only by built-in framework actions, never by user code,
    so adding defensive copies there only adds overhead. (And shallow copies
    wouldn't help anyway: tool-call payloads can carry nested Map.of(...).)
    
    EventKryoSerializationTest in the e2e integration module asserts the
    defensive-copy contract directly: the stored collection must be mutable.
    Two tests, one per affected event.
    
    Co-authored-by: Avichay Marciano <[email protected]>
---
 .../flink/agents/api/event/ChatRequestEvent.java   |  2 +-
 .../api/event/ContextRetrievalResponseEvent.java   |  2 +-
 .../test/EventKryoSerializationTest.java           | 83 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 2 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java 
b/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java
index 9c1d2e3c..b1cdcdbc 100644
--- a/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java
+++ b/api/src/main/java/org/apache/flink/agents/api/event/ChatRequestEvent.java
@@ -42,7 +42,7 @@ public class ChatRequestEvent extends Event {
             String model, List<ChatMessage> messages, @Nullable Object 
outputSchema) {
         super(EVENT_TYPE);
         setAttr("model", model);
-        setAttr("messages", messages);
+        setAttr("messages", new ArrayList<>(messages));
         if (outputSchema != null) {
             setAttr("output_schema", outputSchema);
         }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
 
b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
index 2a52a5d1..35861ce2 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/event/ContextRetrievalResponseEvent.java
@@ -40,7 +40,7 @@ public class ContextRetrievalResponseEvent extends Event {
         super(EVENT_TYPE);
         setAttr("request_id", requestId);
         setAttr("query", query);
-        setAttr("documents", documents);
+        setAttr("documents", new ArrayList<>(documents));
     }
 
     public ContextRetrievalResponseEvent(UUID id, Map<String, Object> 
attributes) {
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/EventKryoSerializationTest.java
 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/EventKryoSerializationTest.java
new file mode 100644
index 00000000..8e48d061
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/EventKryoSerializationTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integration.test;
+
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ContextRetrievalResponseEvent;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+/**
+ * Verifies that the user-constructed Event classes which carry 
collection-typed payloads remain
+ * Kryo-friendly when constructed with immutable collections (e.g. 
List.of(...)).
+ *
+ * <p>On JDK 17+, Flink's Kryo serializer cannot reflectively access internal 
fields of {@code
+ * java.util.ImmutableCollections} subclasses without {@code --add-opens
+ * java.base/java.util=ALL-UNNAMED}. The fix is to defensive-copy the input 
collection at event
+ * construction time so the stored collection is always a plain {@link 
java.util.ArrayList}, which
+ * Kryo handles natively.
+ *
+ * <p>Scope: this test only covers {@link ChatRequestEvent} and {@link
+ * ContextRetrievalResponseEvent}, which are constructed directly by user 
code. {@code
+ * ToolRequestEvent} and {@code ToolResponseEvent} are intentionally excluded 
because they are
+ * constructed only by built-in framework actions, not by users, so defensive 
copies on those events
+ * would only add overhead without protecting any real caller.
+ *
+ * <p>This test asserts the contract directly: the stored collection must be 
mutable, i.e. {@code
+ * add()} must not throw {@link UnsupportedOperationException}, which is 
exactly what would happen
+ * if the immutable input were stored by reference.
+ */
+public class EventKryoSerializationTest {
+
+    @Test
+    void chatRequestEventDefensiveCopiesImmutableList() {
+        ChatRequestEvent event =
+                new ChatRequestEvent(
+                        "myModel", List.of(new ChatMessage(MessageRole.USER, 
"hello")));
+
+        assertDoesNotThrow(
+                () -> event.getMessages().add(new 
ChatMessage(MessageRole.USER, "world")),
+                "ChatRequestEvent must defensive-copy its messages list");
+    }
+
+    @Test
+    void contextRetrievalResponseEventDefensiveCopiesImmutableList() {
+        UUID requestId = UUID.randomUUID();
+        ContextRetrievalResponseEvent event =
+                new ContextRetrievalResponseEvent(
+                        requestId,
+                        "what is flink agents?",
+                        List.of(
+                                new Document(
+                                        "Apache Flink Agents is a streaming 
agent framework.",
+                                        Map.of(),
+                                        "doc-1")));
+
+        assertDoesNotThrow(
+                () -> event.getDocuments().add(new Document("extra", Map.of(), 
"doc-2")),
+                "ContextRetrievalResponseEvent must defensive-copy its 
documents list");
+    }
+}

Reply via email to