purushah commented on code in PR #852:
URL: https://github.com/apache/flink-agents/pull/852#discussion_r3494568307


##########
api/src/main/java/org/apache/flink/agents/api/chat/model/routing/ChatModelRouter.java:
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.chat.model.routing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.ResourceContext;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A pluggable LLM router that selects, per request, which underlying chat 
model should serve it.
+ *
+ * <p>The router is a drop-in {@link BaseChatModelSetup}: it reports {@link 
ResourceType#CHAT_MODEL}
+ * and is resolved by the built-in chat action exactly like any other model, 
so an agent points at
+ * the router by name and nothing else in the framework needs to change. 
Concerns are layered:
+ * selection ({@link RoutingStrategy}) decides the model, optional {@link 
CachingStrategy} memoizes
+ * that decision per conversation, and {@link FallbackPolicy} decides what to 
try if the chosen
+ * model errors. The router then delegates to the chosen model's own {@code 
chat(...)}, preserving
+ * its prompt, tools, parameters, and token metrics.
+ *
+ * <h2>Configuration ({@link ResourceDescriptor} arguments)</h2>
+ *
+ * <ul>
+ *   <li>{@code candidates} (required) — routable models; each a {@link 
RoutingCandidate}, a {@link
+ *       String} name, or a {@link Map} with {@code name}/{@code 
description}/{@code metadata}.
+ *       Every name must reference a registered {@code CHAT_MODEL} resource.
+ *   <li>{@code strategy} (required) — a {@link ResourceDescriptor} naming the 
{@link
+ *       RoutingStrategy} implementation and its args (use {@link
+ *       org.apache.flink.agents.api.resource.ResourceName.RoutingStrategy} 
for built-ins). The
+ *       class needs a public {@code (ResourceDescriptor, ResourceContext)} 
constructor (see {@link
+ *       AbstractRoutingStrategy}); it is loaded with the thread context 
classloader so user classes
+ *       resolve on a cluster.
+ *   <li>{@code fallback} (optional, default {@code false}) — when {@code 
true}, a failing model
+ *       falls back to the remaining candidates in declaration order.
+ *   <li>{@code cache} (optional, default {@code true}) — memoize the 
selection per conversation.
+ *   <li>{@code cache_size} (optional, default {@link 
CachingStrategy#DEFAULT_MAX_ENTRIES}) — LRU
+ *       capacity when caching is enabled.
+ *   <li>{@code default} (optional) — the candidate used when the strategy 
abstains or names a
+ *       non-candidate (a routing miss). Must be one of {@code candidates}; 
defaults to the first
+ *       candidate.
+ * </ul>
+ *
+ * <p><b>Graceful degrade:</b> if the strategy returns {@code null} ("no 
opinion", e.g. a transient
+ * LLM-judge failure) or a name that is not a configured candidate, the router 
treats it as a
+ * routing miss and serves the request from {@code default} (then the fallback 
order) rather than
+ * failing.
+ *
+ * <p><b>Bash/skill tool args (v1 scope):</b> the built-in chat action injects 
bash allowlists and
+ * skill directories from the resource resolved by the agent's model name — 
i.e. this router — not
+ * the chosen backend. So configure {@code allowed_commands} / {@code 
allowed_script_dirs} / {@code
+ * skills} <b>on the router</b>; per-candidate skills/allowlists are not 
supported in v1.
+ *
+ * <p><b>Metrics note (v1):</b> retry metrics recorded by the built-in chat 
action are grouped under
+ * this router's connection label ({@code "router"}), not the backend model 
actually used.
+ * Per-backend attribution is a documented follow-up.
+ */
+public class ChatModelRouter extends BaseChatModelSetup {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChatModelRouter.class);
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    public static final String ARG_CANDIDATES = "candidates";
+    public static final String ARG_STRATEGY = "strategy";
+    public static final String ARG_FALLBACK = "fallback";
+    public static final String ARG_CACHE = "cache";
+    public static final String ARG_CACHE_SIZE = "cache_size";
+    public static final String ARG_DEFAULT = "default";
+
+    private final List<RoutingCandidate> candidates;
+    private final Set<String> candidateNames;
+    private final RoutingStrategy strategy;
+    private final FallbackPolicy fallbackPolicy;
+    private final boolean fallbackEnabled;
+    private final String defaultCandidate;
+
+    @SuppressWarnings("unchecked")
+    public ChatModelRouter(ResourceDescriptor descriptor, ResourceContext 
resourceContext) {
+        super(descriptor, resourceContext);
+
+        List<Object> rawCandidates = descriptor.getArgument(ARG_CANDIDATES);
+        if (rawCandidates == null || rawCandidates.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "ChatModelRouter requires a non-empty 'candidates' 
argument.");
+        }
+        List<RoutingCandidate> parsed = new ArrayList<>(rawCandidates.size());
+        Set<String> names = new LinkedHashSet<>();
+        for (Object spec : rawCandidates) {
+            RoutingCandidate candidate = RoutingCandidate.from(spec);
+            parsed.add(candidate);
+            names.add(candidate.getName());
+        }
+        this.candidates = Collections.unmodifiableList(parsed);
+        this.candidateNames = Collections.unmodifiableSet(names);
+
+        ResourceDescriptor strategyDescriptor =
+                toResourceDescriptor(descriptor.getArgument(ARG_STRATEGY));
+        RoutingStrategy base = instantiateStrategy(strategyDescriptor, 
resourceContext);
+
+        boolean cache = descriptor.getArgument(ARG_CACHE, Boolean.TRUE);
+        if (cache) {
+            int cacheSize =
+                    descriptor.getArgument(ARG_CACHE_SIZE, 
CachingStrategy.DEFAULT_MAX_ENTRIES);
+            this.strategy = new CachingStrategy(base, cacheSize);
+        } else {
+            this.strategy = base;
+        }
+
+        this.fallbackEnabled = descriptor.getArgument(ARG_FALLBACK, 
Boolean.FALSE);
+        this.fallbackPolicy =
+                fallbackEnabled ? FallbackPolicy.remainingCandidates() : 
FallbackPolicy.none();
+
+        // Default candidate used on a routing miss (strategy abstains / names 
a non-candidate).
+        // Validated at construction so a typo is a clear config error, not a 
per-request failure.
+        String configuredDefault = descriptor.getArgument(ARG_DEFAULT);
+        if (configuredDefault != null && 
!candidateNames.contains(configuredDefault)) {
+            throw new IllegalArgumentException(
+                    "ChatModelRouter 'default' '"
+                            + configuredDefault
+                            + "' is not one of the configured candidates "
+                            + candidateNames);
+        }
+        this.defaultCandidate =
+                configuredDefault != null ? configuredDefault : 
candidates.get(0).getName();
+    }
+
+    /**
+     * The router has no connection of its own to resolve (it delegates to 
candidate models, each of
+     * which resolves its own). Override to skip the base class's connection 
resolution.
+     */
+    @Override
+    public void open() {

Review Comment:
   Good point. The stronger “runs once per conversation” wording was 
overstating the guarantee. I softened this to “typically runs once per 
conversation” and documented that this is best-effort memoization: a concurrent 
first-touch on the same key can double-compute, but the synchronized backing 
map avoids corruption and last-writer-wins is benign, so I kept the 
implementation lock-free.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to