lhotari commented on code in PR #25773:
URL: https://github.com/apache/pulsar/pull/25773#discussion_r3246738967


##########
pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java:
##########
@@ -186,4 +199,66 @@ public static TreeMap<String, Connector> 
searchForConnectors(String connectorsDi
         }
         return connectors;
     }
+
+    /**
+     * Reloads connectors from disk against {@code previous}, reusing {@link 
Connector} instances when path and
+     * archive MD5 are unchanged (keeps class loaders open). New or changed 
archives get new instances.
+     * <p>
+     * {@link ReloadConnectorsResult#connectorsToClose()} lists connectors 
evicted from the active set (replaced or
+     * no longer present on disk); the caller must {@link Connector#close()} 
each (typically via
+     * {@code ConnectorsManager}).
+     *
+     * @param previous                 connectors from the previous scan (may 
be empty, never null)
+     * @param connectorsDirectory      same semantics as {@link 
#searchForConnectors}
+     * @param narExtractionDirectory   same semantics as {@link 
#searchForConnectors}
+     * @param enableClassloading       same semantics as {@link 
#searchForConnectors}
+     * @return new map keyed by connector name (reused values are identical 
instances from {@code previous}) and
+     *         connectors the caller should close
+     */
+    public static ReloadConnectorsResult reloadConnectors(
+            TreeMap<String, Connector> previous,
+            String connectorsDirectory,
+            String narExtractionDirectory,
+            boolean enableClassloading) throws IOException {
+
+        TreeMap<String, Connector> remaining = new TreeMap<>(previous);
+        TreeMap<String, Connector> next = new TreeMap<>();
+        List<Connector> toClose = new ArrayList<>();
+
+        Path dir = Paths.get(connectorsDirectory).toAbsolutePath().normalize();
+        if (!dir.toFile().exists()) {
+            toClose.addAll(remaining.values());
+            return new ReloadConnectorsResult(next, toClose);
+        }
+
+        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, 
"*.nar")) {
+            for (Path archive : stream) {
+                try {
+                    ConnectorDefinition cntDef = 
ConnectorUtils.getConnectorDefinition(archive.toFile());
+                    String name = cntDef.getName();
+                    String md5Hex = computeArchiveMd5Hex(archive);
+                    Connector prev = remaining.remove(name);
+                    if (prev != null
+                            && prev.getArchivePath() != null
+                            && archive.equals(prev.getArchivePath())
+                            && md5Hex.equals(prev.getArchiveMd5Hex())) {
+                        next.put(name, prev);
+                    } else {
+                        if (prev != null) {
+                            toClose.add(prev);
+                        }
+                        next.put(name, new Connector(archive, cntDef, 
narExtractionDirectory, enableClassloading,

Review Comment:
   add logging here about finding a changed connector



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to