wzhero1 commented on code in PR #655:
URL: https://github.com/apache/flink-agents/pull/655#discussion_r3263199104


##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/repository/URLSkillRepository.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.skill.repository;
+
+import org.apache.flink.agents.runtime.skill.AgentSkill;
+import org.apache.flink.agents.runtime.skill.SkillRepository;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Skill repository backed by an http(s) URL pointing to a zip.
+ *
+ * <p>The zip is downloaded to a temp file and extracted into a process-local 
temp directory. The
+ * downloaded zip itself is removed once extraction completes; the extracted 
directory is released
+ * via {@link #close()} (cascaded through {@code SkillManager} → {@code 
ResourceContextImpl} →
+ * {@code ResourceCache} on operator close). A JVM shutdown hook acts as 
fallback cleanup if {@code
+ * close()} is never called.
+ *
+ * <p>Composes a {@link SkillDirectoryReader} for the on-disk parsing; the 
materialization happens
+ * upfront in the constructor (no abuse of {@code super(...)}).
+ */
+public final class URLSkillRepository implements SkillRepository {
+
+    private static final int REQUEST_TIMEOUT_MS = 90_000;
+
+    private final String url;

Review Comment:
   **Field name `owned` is an adjective without a subject.**
   
   `Materialized` is already a noun. Rename the field to `materialization` 
(matches the class name), or rename the inner class to `MaterializedDir` so the 
field reads `materializedDir`. Current call sites `this.owned.close()` / `if 
(owned == null)` don't read well.
   
   Same applies to `ClasspathSkillRepository.java:73` and 
`FileSystemSkillRepository.java:51`.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillRepository.java:
##########
@@ -20,14 +20,19 @@
 
 import javax.annotation.Nullable;
 
+import java.nio.file.Path;
 import java.util.List;
 import java.util.Map;
 
 /**
  * Source of skills. Mirrors the Python {@code
  * flink_agents.runtime.skill.skill_repository.SkillRepository}.
+ *
+ * <p>Implementations that own a materialized temp directory (URL / classpath 
/ zip) should override

Review Comment:
   **Three sibling repos duplicate the same 5 delegating methods + state** 
(`reader`, `materialization`, plus 
`getSkill`/`getSkills`/`getResources`/`getSkillDir`/`close`) — ~45 lines of 
boilerplate, +15 per future source.
   
   Extract `AbstractMaterializedSkillRepository` holding the shared state and 
implementing the delegators once; concrete repos reduce to source-specific I/O 
+ an abstract `getOrigin()`.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java:
##########
@@ -111,38 +148,94 @@ public List<String> getSkillDirs(List<String> names) {
     @Nullable
     public Path getSkillDir(String skillName) {
         SkillRepository repo = repos.get(skillName);
-        if (repo instanceof FileSystemSkillRepository) {
-            return ((FileSystemSkillRepository) 
repo).getBaseDir().resolve(skillName);
-        }
-        return null;
+        return repo == null ? null : repo.getSkillDir(skillName);
     }
 
     /** Resolve a skill resource's relative path to an absolute path, or 
{@code null} if missing. */
     @Nullable
     public Path resolveResourcePath(String skillName, String resourcePath) {
         SkillRepository repo = repos.get(skillName);
-        if (repo instanceof FileSystemSkillRepository) {
-            Path resolved =
-                    ((FileSystemSkillRepository) repo)
-                            .getBaseDir()
-                            .resolve(skillName)
-                            .resolve(resourcePath);
-            if (Files.isRegularFile(resolved)) {
-                return resolved;
+        if (repo == null) {
+            return null;
+        }
+        Path dir = repo.getSkillDir(skillName);
+        if (dir == null) {
+            return null;
+        }
+        Path resolved = dir.resolve(resourcePath);
+        return Files.isRegularFile(resolved) ? resolved : null;
+    }
+
+    private void loadAll() {
+        for (SkillSourceSpec spec : config.getSources()) {
+            try {
+                SkillRepository repo =
+                        SkillSourceRegistry.get(spec.getScheme())
+                                .open(spec.getParams(), classLoader);
+                registerRepo(repo, originOf(spec));
+            } catch (IOException | IllegalArgumentException e) {
+                throw new IllegalStateException(
+                        "Failed to load skills from " + spec.getScheme() + ":" 
+ spec.getParams(),
+                        e);
+            }
+        }
+    }
+
+    /** Build a {@link SkillOrigin} from a spec for diagnostics (WARN on 
duplicates, etc.). */
+    private static SkillOrigin originOf(SkillSourceSpec spec) {
+        Map<String, String> p = spec.getParams();
+        String location;
+        switch (spec.getScheme()) {
+            case "local":
+                location = p.getOrDefault("path", "");
+                break;
+            case "url":
+                location = p.getOrDefault("url", "");
+                break;
+            case "classpath":
+                location = p.getOrDefault("resource", "");
+                break;
+            case "package":
+                location = p.getOrDefault("package", "") + "/" + 
p.getOrDefault("resource", "");
+                break;
+            default:
+                location = p.toString();
+        }
+        return new SkillOrigin(spec.getScheme(), location);
+    }
+
+    /**
+     * Close every owned {@link SkillRepository}, releasing any temp 
directories materialized for
+     * URL / classpath-zip / classpath-jar sources. Idempotent.
+     */
+    @Override
+    public void close() {
+        // repos may map multiple skill names to the same repo instance; dedup 
by identity.
+        Set<SkillRepository> unique = Collections.newSetFromMap(new 
IdentityHashMap<>());
+        unique.addAll(repos.values());

Review Comment:
   **`SkillManager.close()` swallows exceptions silently; inconsistent with 
`ResourceCache.close()`.**
   
   ```java
   for (SkillRepository repo : unique) {
       try { repo.close(); } catch (Exception ignored) {}
   }
   ```
   `ResourceCache.close()` (line 135-160) collects first-exception + suppressed 
and rethrows. Same close cascade, two styles. Real failures (locked file, 
permission denied, disk full) give callers no signal. 
`ResourceContextImpl.ensureSkillManager()` line 116 has the same silent catch 
on config swap.
   
   Minimum: `LOG.warn(..., e)`. Preferable: align with 
`ResourceCache.close()`'s rethrow pattern.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/AgentSkill.java:
##########
@@ -44,6 +44,7 @@ public final class AgentSkill {
     @Nullable private volatile Map<String, String> resources;
     @Nullable private Supplier<Map<String, String>> resourceLoader;
     private volatile boolean activated;
+    @Nullable private volatile SkillOrigin origin;

Review Comment:
   **Java/Python serialization surface for `origin` is asymmetric.**
   
   Python's `AgentSkill` extends `BaseModel`, so `origin` is in default 
`model_dump()` output. Java's `AgentSkill` is a POJO with no Jackson annotation 
— any future debug/checkpoint/Pemja serialization path silently drops it on 
Java while Python emits it. Same class of cross-language drop A3 fixed at the 
`Skills` envelope.
   
   Add `@JsonProperty("origin")` + `@JsonInclude(NON_NULL)`. Near-zero cost 
now; cost grows once `AgentSkill` travels on the wire.



##########
plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java:
##########
@@ -532,11 +533,11 @@ private void addSkills(Map<String, Skills> skillsObjects) 
throws Exception {
                         ResourceType.TOOL,
                         new ResourceDescriptor(BashTool.class.getName(), new 
HashMap<>())));
 
-        LinkedHashSet<String> paths = new LinkedHashSet<>();
+        LinkedHashSet<SkillSourceSpec> sources = new LinkedHashSet<>();
         for (Skills s : skillsObjects.values()) {

Review Comment:
   **Multi-`@Skills` merge order is non-deterministic on Java; WARN reports 
collisions but the "winner" is unspecified.**
   
   `AgentPlan.addSkills` iterates `skillsObjects.values()` in `LinkedHashMap` 
order, but insertion order comes from `Class.getMethods()` — JLS doesn't 
guarantee declaration order. Test self-discloses at 
`AgentPlanDeclareSkillsTest:163`: "Class.getMethods() does not preserve 
declaration order".
   
   Two `@Skills` methods with the same skill name pick different winners across 
deployments / JDK versions; WARN fires but is non-reproducible. Python `dict` 
preserves insertion order — Python is deterministic, Java is the bad case.
   
   Min fix: sort method names lexicographically before merging + javadoc note. 
Stronger: `@Skills(priority=N)` or fail-fast on collision.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java:
##########
@@ -18,33 +18,63 @@
 
 package org.apache.flink.agents.runtime.skill;
 
+import org.apache.flink.agents.api.skills.SkillSourceSpec;
 import org.apache.flink.agents.api.skills.Skills;
-import 
org.apache.flink.agents.runtime.skill.repository.FileSystemSkillRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Loads and indexes all skills referenced by a {@link Skills} configuration.
  *
  * <p>Mirrors the Python {@code 
flink_agents.runtime.skill.skill_manager.SkillManager}.
+ *
+ * <p>Owned by {@code ResourceContextImpl}; closed via {@code 
ResourceCache.close()} on operator
+ * close, which cascades to each repository's temp directory. Avoid 
constructing a {@code
+ * SkillManager} outside that flow without {@code try-with-resources}.
  */
-public class SkillManager {
+public class SkillManager implements AutoCloseable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SkillManager.class);
 
     private final Skills config;
+    private final ClassLoader classLoader;
     private final Map<String, AgentSkill> skills = new LinkedHashMap<>();
     private final Map<String, SkillRepository> repos = new HashMap<>();
 
-    public SkillManager(Skills config) {
+    /**
+     * Construct a {@code SkillManager} that resolves {@code classpath:} 
sources against {@code
+     * classLoader}. Production code passes the Flink user-code class loader 
(threaded through
+     * {@code ResourceCache} from {@code ActionExecutionOperator}); tests / 
standalone use may use
+     * {@link #SkillManager(Skills)}.
+     */
+    public SkillManager(Skills config, ClassLoader classLoader) {

Review Comment:
   **Constructor-failure leak: partial `loadAll()` leaves registered repos with 
no closeable handle.**
   
   If the 2nd of 3 sources succeeds (repo + hook registered) but the 3rd 
throws, the constructor exits with `IllegalStateException` → caller never gets 
a `SkillManager` reference → cannot call `close()` → 2nd repo's hook + temp dir 
leak until JVM exit.
   
   Same monotonic-leak class as review #10, just triggered by 
partial-construction instead of hot-reload. 5-line fix: wrap `loadAll`'s loop 
in try/catch, call `close()` before rethrowing.



##########
runtime/src/main/java/org/apache/flink/agents/runtime/skill/SkillManager.java:
##########
@@ -111,38 +148,94 @@ public List<String> getSkillDirs(List<String> names) {
     @Nullable
     public Path getSkillDir(String skillName) {
         SkillRepository repo = repos.get(skillName);
-        if (repo instanceof FileSystemSkillRepository) {
-            return ((FileSystemSkillRepository) 
repo).getBaseDir().resolve(skillName);
-        }
-        return null;
+        return repo == null ? null : repo.getSkillDir(skillName);
     }
 
     /** Resolve a skill resource's relative path to an absolute path, or 
{@code null} if missing. */
     @Nullable
     public Path resolveResourcePath(String skillName, String resourcePath) {
         SkillRepository repo = repos.get(skillName);
-        if (repo instanceof FileSystemSkillRepository) {
-            Path resolved =
-                    ((FileSystemSkillRepository) repo)
-                            .getBaseDir()
-                            .resolve(skillName)
-                            .resolve(resourcePath);
-            if (Files.isRegularFile(resolved)) {
-                return resolved;
+        if (repo == null) {
+            return null;
+        }
+        Path dir = repo.getSkillDir(skillName);
+        if (dir == null) {
+            return null;
+        }
+        Path resolved = dir.resolve(resourcePath);
+        return Files.isRegularFile(resolved) ? resolved : null;
+    }
+
+    private void loadAll() {
+        for (SkillSourceSpec spec : config.getSources()) {
+            try {
+                SkillRepository repo =
+                        SkillSourceRegistry.get(spec.getScheme())
+                                .open(spec.getParams(), classLoader);
+                registerRepo(repo, originOf(spec));
+            } catch (IOException | IllegalArgumentException e) {
+                throw new IllegalStateException(
+                        "Failed to load skills from " + spec.getScheme() + ":" 
+ spec.getParams(),
+                        e);
+            }
+        }
+    }
+
+    /** Build a {@link SkillOrigin} from a spec for diagnostics (WARN on 
duplicates, etc.). */

Review Comment:
   **`originOf` re-introduces the parallel scheme ladder the registry was meant 
to eliminate.**
   
   ```java
   switch (spec.getScheme()) {
       case "local": ...; case "url": ...;
       case "classpath": ...; case "package": ...;
   }
   ```
   Python's `_origin_of` mirrors this. Adding scheme `s3` now requires 3 file 
edits (registry ×2 + originOf ×2), not 1. Java's `case "package"` is dead code 
— no Java `package` handler exists.
   
   Fix: add `default String describeLocation(params)` to `SkillSourceHandler`. 
Each handler describes its own location at registration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to