This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 308aa0f33bf188b90c923e896a817e7870f6b8b6 Author: Dream95 <[email protected]> AuthorDate: Fri May 15 19:39:29 2026 +0800 [improve][fn] make built-in connector reload incremental (#25773) Signed-off-by: Dream95 <[email protected]> (cherry picked from commit 02cab7abea62740d9a415f16fc45bcb6af963175) --- .../org/apache/pulsar/common/nar/FileUtils.java | 31 +++++ .../org/apache/pulsar/common/nar/NarUnpacker.java | 33 +---- .../pulsar/functions/worker/ConnectorsManager.java | 28 +++- .../ConnectorsManagerReloadConnectorsTest.java | 82 ++++++++++++ .../pulsar/functions/utils/io/Connector.java | 26 ++++ .../pulsar/functions/utils/io/ConnectorUtils.java | 78 +++++++++++- .../functions/utils/io/ReloadConnectorsResult.java | 29 +++++ .../utils/io/ConnectorUtilsReloadTest.java | 141 +++++++++++++++++++++ 8 files changed, 407 insertions(+), 41 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java index 7f313676345..55711850022 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java @@ -25,8 +25,11 @@ package org.apache.pulsar.common.nar; import java.io.File; +import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.Collection; import java.util.zip.ZipEntry; @@ -44,6 +47,34 @@ public class FileUtils { public static final long MILLIS_BETWEEN_ATTEMPTS = 50L; + /** + * Calculates an md5 sum of the specified file. + * + * @param file + * to calculate the md5sum of + * @return the md5sum bytes + * @throws IOException + * if cannot read file + */ + public static byte[] calculateMd5sum(final File file) throws IOException { + try (final FileInputStream inputStream = new FileInputStream(file)) { + // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient for this use case + final MessageDigest md5 = MessageDigest.getInstance("md5"); + + final byte[] buffer = new byte[1024]; + int read = inputStream.read(buffer); + + while (read > -1) { + md5.update(buffer, 0, read); + read = inputStream.read(buffer); + } + + return md5.digest(); + } catch (NoSuchAlgorithmException nsae) { + throw new IllegalArgumentException(nsae); + } + } + public static void ensureDirectoryExistAndCanReadAndWrite(final File dir) throws IOException { if (dir.exists() && !dir.isDirectory()) { throw new IOException(dir.getAbsolutePath() + " is not a directory"); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java index ef802674b42..2ea35f64180 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java @@ -25,7 +25,6 @@ package org.apache.pulsar.common.nar; import com.google.common.annotations.VisibleForTesting; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -35,8 +34,6 @@ import java.nio.channels.FileLock; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.Base64; import java.util.Enumeration; import java.util.concurrent.ConcurrentHashMap; @@ -77,7 +74,7 @@ public class NarUnpacker { throw new IOException("Cannot create " + parentDirectory); } } - String md5Sum = Base64.getUrlEncoder().withoutPadding().encodeToString(calculateMd5sum(nar)); + String md5Sum = Base64.getUrlEncoder().withoutPadding().encodeToString(FileUtils.calculateMd5sum(nar)); // ensure that one process can extract the files File lockFile = new File(parentDirectory, "." + md5Sum + ".lock"); // prevent OverlappingFileLockException by ensuring that one thread tries to create a lock in this JVM @@ -171,32 +168,4 @@ public class NarUnpacker { } } } - - /** - * Calculates an md5 sum of the specified file. - * - * @param file - * to calculate the md5sum of - * @return the md5sum bytes - * @throws IOException - * if cannot read file - */ - protected static byte[] calculateMd5sum(final File file) throws IOException { - try (final FileInputStream inputStream = new FileInputStream(file)) { - // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient for this use case - final MessageDigest md5 = MessageDigest.getInstance("md5"); - - final byte[] buffer = new byte[1024]; - int read = inputStream.read(buffer); - - while (read > -1) { - md5.update(buffer, 0, read); - read = inputStream.read(buffer); - } - - return md5.digest(); - } catch (NoSuchAlgorithmException nsae) { - throw new IllegalArgumentException(nsae); - } - } } \ No newline at end of file diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java index 19d31d0f63b..a8ee6f3ce20 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java @@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.nio.file.Path; +import java.util.Collection; import java.util.List; import java.util.TreeMap; import java.util.stream.Collectors; @@ -31,6 +32,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.utils.io.Connector; import org.apache.pulsar.functions.utils.io.ConnectorUtils; +import org.apache.pulsar.functions.utils.io.ReloadConnectorsResult; @Slf4j public class ConnectorsManager implements AutoCloseable { @@ -48,12 +50,16 @@ public class ConnectorsManager implements AutoCloseable { } private static TreeMap<String, Connector> createConnectors(WorkerConfig workerConfig) throws IOException { - boolean enableClassloading = workerConfig.getEnableClassloadingOfBuiltinFiles() - || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName()); + boolean enableClassloading = isEnableClassloading(workerConfig); return ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(), workerConfig.getNarExtractionDirectory(), enableClassloading); } + private static boolean isEnableClassloading(WorkerConfig workerConfig) { + return workerConfig.getEnableClassloadingOfBuiltinFiles() + || ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName()); + } + @VisibleForTesting public void addConnector(String connectorType, Connector connector) { connectors.put(connectorType, connector); @@ -89,9 +95,13 @@ public class ConnectorsManager implements AutoCloseable { } public void reloadConnectors(WorkerConfig workerConfig) throws IOException { - TreeMap<String, Connector> oldConnectors = connectors; - this.connectors = createConnectors(workerConfig); - closeConnectors(oldConnectors); + ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors( + this.connectors, + workerConfig.getConnectorsDirectory(), + workerConfig.getNarExtractionDirectory(), + isEnableClassloading(workerConfig)); + this.connectors = reload.connectors(); + closeConnectors(reload.connectorsToClose()); } @Override @@ -99,14 +109,18 @@ public class ConnectorsManager implements AutoCloseable { closeConnectors(connectors); } - private void closeConnectors(TreeMap<String, Connector> connectorMap) { - connectorMap.values().forEach(connector -> { + private void closeConnectors(Collection<Connector> connectors) { + connectors.forEach(connector -> { try { connector.close(); } catch (Exception e) { log.warn("Failed to close connector", e); } }); + } + + private void closeConnectors(TreeMap<String, Connector> connectorMap) { + closeConnectors(connectorMap.values()); connectorMap.clear(); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java new file mode 100644 index 00000000000..82c280f97ad --- /dev/null +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import static org.testng.Assert.assertSame; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.functions.utils.io.Connector; +import org.testng.annotations.Test; + +/** + * Tests {@link ConnectorsManager#reloadConnectors(WorkerConfig)} for incremental reload behavior, + * ensuring unchanged connectors are reused instead of being recreated. + */ +public class ConnectorsManagerReloadConnectorsTest { + + private static void writeMinimalNar(Path narPath, ConnectorDefinition def) throws IOException { + byte[] yaml = ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def); + try (OutputStream os = Files.newOutputStream(narPath); + ZipOutputStream zos = new ZipOutputStream(os)) { + ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml"); + zos.putNextEntry(entry); + zos.write(yaml); + zos.closeEntry(); + } + } + + private static ConnectorDefinition sampleDefinition(String name) { + ConnectorDefinition def = new ConnectorDefinition(); + def.setName(name); + def.setSinkClass("org.example.Sink"); + def.setSourceClass("org.example.Source"); + return def; + } + + @Test + public void reloadWhenNarUnchangedReusesSameConnectorInstance() throws Exception { + Path dir = Files.createTempDirectory("mgr-conn-reload-"); + Path nar = dir.resolve("c1.nar"); + writeMinimalNar(nar, sampleDefinition("c-one")); + + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setConnectorsDirectory(dir.toString()); + workerConfig.setNarExtractionDirectory(NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR); + workerConfig.setEnableClassloadingOfBuiltinFiles(false); + + try (ConnectorsManager manager = new ConnectorsManager(workerConfig)) { + Connector before = manager.getConnector("c-one"); + before.getConnectorFunctionPackage(); + + manager.reloadConnectors(workerConfig); + + Connector after = manager.getConnector("c-one"); + assertSame(after, before); + before.getConnectorFunctionPackage(); + } + } + +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java index 5fcc22747c5..bff477b40cd 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java @@ -28,6 +28,8 @@ import org.apache.pulsar.functions.utils.ValidatableFunctionPackage; public class Connector implements AutoCloseable { private final Path archivePath; + /** MD5 hex of archive file contents; empty when {@link #archivePath} is null (test doubles). */ + private final String archiveMd5Hex; private final String narExtractionDirectory; private final boolean enableClassloading; private ValidatableFunctionPackage connectorFunctionPackage; @@ -38,16 +40,40 @@ public class Connector implements AutoCloseable { public Connector(Path archivePath, ConnectorDefinition connectorDefinition, String narExtractionDirectory, boolean enableClassloading) { + this(archivePath, connectorDefinition, narExtractionDirectory, enableClassloading, null); + } + + /** + * @param precomputedArchiveMd5Hex MD5 hex of {@code archivePath} contents; if null and path is non-null, + * the hash is computed once at construction time. + */ + public Connector(Path archivePath, ConnectorDefinition connectorDefinition, String narExtractionDirectory, + boolean enableClassloading, String precomputedArchiveMd5Hex) { this.archivePath = archivePath; this.connectorDefinition = connectorDefinition; this.narExtractionDirectory = narExtractionDirectory; this.enableClassloading = enableClassloading; + if (archivePath != null) { + try { + this.archiveMd5Hex = precomputedArchiveMd5Hex != null + ? precomputedArchiveMd5Hex + : ConnectorUtils.computeArchiveMd5Hex(archivePath); + } catch (java.io.IOException e) { + throw new java.io.UncheckedIOException(e); + } + } else { + this.archiveMd5Hex = ""; + } } public Path getArchivePath() { return archivePath; } + public String getArchiveMd5Hex() { + return archiveMd5Hex; + } + public synchronized ValidatableFunctionPackage getConnectorFunctionPackage() { checkState(); if (connectorFunctionPackage == null) { diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java index 71cab749ba0..d08a4942884 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java @@ -24,7 +24,9 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HexFormat; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -39,6 +41,7 @@ import net.bytebuddy.description.type.TypeDefinition; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.io.ConfigFieldDefinition; import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.nar.FileUtils; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.functions.utils.Exceptions; import org.apache.pulsar.functions.utils.ValidatableFunctionPackage; @@ -52,7 +55,16 @@ import org.apache.pulsar.io.core.annotations.FieldDoc; @Slf4j public class ConnectorUtils { - private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml"; + /** + * Computes MD5 digest of a file as lower-case hex (for connector archive identity on reload). + */ + public static String computeArchiveMd5Hex(Path path) throws IOException { + return calculateMd5Hex(path.toAbsolutePath().normalize().toFile()); + } + + private static String calculateMd5Hex(File file) throws IOException { + return HexFormat.of().formatHex(FileUtils.calculateMd5sum(file)); + } /** * Extract the Pulsar IO Source class from a connector archive. @@ -171,7 +183,8 @@ public class ConnectorUtils { try { ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toFile()); log.info("Found connector {} from {}", cntDef, archive); - Connector connector = new Connector(archive, cntDef, narExtractionDirectory, enableClassloading); + Connector connector = new Connector(archive, cntDef, narExtractionDirectory, + enableClassloading); connectors.put(cntDef.getName(), connector); } catch (Throwable t) { log.warn("Failed to load connector from {}", archive, t); @@ -180,4 +193,65 @@ public class ConnectorUtils { } return connectors; } + + /** + * Reloads connectors from disk against {@code previous}, reusing {@link Connector} instances when path and + * archive MD5 are unchanged (keeps class loaders open). New or changed archives get new instances. + * <p> + * {@link ReloadConnectorsResult#connectorsToClose()} lists connectors evicted from the active set (replaced or + * no longer present on disk); the caller must {@link Connector#close()} each (typically via + * {@code ConnectorsManager}). + * + * @param previous connectors from the previous scan (may be empty, never null) + * @param connectorsDirectory same semantics as {@link #searchForConnectors} + * @param narExtractionDirectory same semantics as {@link #searchForConnectors} + * @param enableClassloading same semantics as {@link #searchForConnectors} + * @return new map keyed by connector name (reused values are identical instances from {@code previous}) and + * connectors the caller should close + */ + public static ReloadConnectorsResult reloadConnectors( + TreeMap<String, Connector> previous, + String connectorsDirectory, + String narExtractionDirectory, + boolean enableClassloading) throws IOException { + + TreeMap<String, Connector> remaining = new TreeMap<>(previous); + TreeMap<String, Connector> next = new TreeMap<>(); + List<Connector> toClose = new ArrayList<>(); + + Path dir = Paths.get(connectorsDirectory).toAbsolutePath().normalize(); + if (!dir.toFile().exists()) { + toClose.addAll(remaining.values()); + return new ReloadConnectorsResult(next, toClose); + } + + try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "*.nar")) { + for (Path archive : stream) { + try { + ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(archive.toFile()); + String name = cntDef.getName(); + String md5Hex = computeArchiveMd5Hex(archive); + Connector prev = remaining.remove(name); + if (prev != null + && prev.getArchivePath() != null + && archive.equals(prev.getArchivePath()) + && md5Hex.equals(prev.getArchiveMd5Hex())) { + next.put(name, prev); + } else { + if (prev != null) { + log.info("Reloading changed connector name={} archive={} previousArchive={}", name, archive, + prev.getArchivePath()); + toClose.add(prev); + } + next.put(name, new Connector(archive, cntDef, narExtractionDirectory, enableClassloading, + md5Hex)); + } + } catch (Throwable t) { + log.warn("Failed to load connector archive={}", archive, t); + } + } + } + toClose.addAll(remaining.values()); + return new ReloadConnectorsResult(next, toClose); + } } diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java new file mode 100644 index 00000000000..cdfab35692c --- /dev/null +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.io; + +import java.util.List; +import java.util.TreeMap; + +/** + * Result of {@link ConnectorUtils#reloadConnectors}: the new connector map and connectors evicted from the active set + * that the caller must close. + */ +public record ReloadConnectorsResult(TreeMap<String, Connector> connectors, List<Connector> connectorsToClose) { +} diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java new file mode 100644 index 00000000000..07b6459369a --- /dev/null +++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.utils.io; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.TreeMap; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import org.apache.pulsar.common.io.ConnectorDefinition; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.annotations.Test; + +@Test +public class ConnectorUtilsReloadTest { + + private static void closeEvicted(ReloadConnectorsResult reload) throws Exception { + for (Connector c : reload.connectorsToClose()) { + c.close(); + } + } + + private static void writeMinimalNar(Path narPath, ConnectorDefinition def) throws IOException { + byte[] yaml = ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def); + try (OutputStream os = Files.newOutputStream(narPath); + ZipOutputStream zos = new ZipOutputStream(os)) { + ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml"); + zos.putNextEntry(entry); + zos.write(yaml); + zos.closeEntry(); + } + } + + private static ConnectorDefinition sampleDefinition(String name) { + ConnectorDefinition def = new ConnectorDefinition(); + def.setName(name); + def.setSinkClass("org.example.Sink"); + def.setSourceClass("org.example.Source"); + return def; + } + + /** + * Historical {@code ConnectorsManager} reload replaced the whole map and closed every prior + * {@link Connector}, even when NAR files were unchanged. A caller keeping a reference to the + * pre-reload connector would then hit {@link IllegalStateException} on lazy use. + * <p> + * Incremental reload must evict nothing, reuse the same instance, and leave that instance usable + * after the caller closes only {@link ReloadConnectorsResult#connectorsToClose()}. + */ + @Test + public void reloadUnchangedNarEvictsNothingAndKeepsSameConnectorUsable() throws Exception { + Path dir = Files.createTempDirectory("conn-reload-"); + Path nar = dir.resolve("c1.nar"); + writeMinimalNar(nar, sampleDefinition("c-one")); + + TreeMap<String, Connector> first = + ConnectorUtils.searchForConnectors(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + Connector c1 = first.get("c-one"); + c1.getConnectorFunctionPackage(); + + ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors( + first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + assertTrue(reload.connectorsToClose().isEmpty()); + closeEvicted(reload); + TreeMap<String, Connector> second = reload.connectors(); + + assertSame(second.get("c-one"), c1); + c1.getConnectorFunctionPackage(); + } + + @Test + public void reloadReopensConnectorWhenNarContentChanges() throws Exception { + Path dir = Files.createTempDirectory("conn-reload-"); + Path nar = dir.resolve("c1.nar"); + writeMinimalNar(nar, sampleDefinition("c-one")); + + TreeMap<String, Connector> first = + ConnectorUtils.searchForConnectors(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + Connector before = first.get("c-one"); + + ConnectorDefinition updated = sampleDefinition("c-one"); + updated.setDescription("changed"); + writeMinimalNar(nar, updated); + + ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors( + first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + closeEvicted(reload); + TreeMap<String, Connector> second = reload.connectors(); + + assertNotSame(second.get("c-one"), before); + assertThrows(IllegalStateException.class, before::getConnectorFunctionPackage); + } + + @Test + public void reloadClosesConnectorsRemovedFromDirectory() throws Exception { + Path dir = Files.createTempDirectory("conn-reload-"); + Path nar1 = dir.resolve("a.nar"); + Path nar2 = dir.resolve("b.nar"); + writeMinimalNar(nar1, sampleDefinition("conn-a")); + writeMinimalNar(nar2, sampleDefinition("conn-b")); + + TreeMap<String, Connector> first = + ConnectorUtils.searchForConnectors(dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + Connector removed = first.get("conn-b"); + Files.delete(nar2); + + ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors( + first, dir.toString(), NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false); + closeEvicted(reload); + TreeMap<String, Connector> second = reload.connectors(); + + assertEquals(second.size(), 1); + assertSame(second.get("conn-a"), first.get("conn-a")); + assertThrows(IllegalStateException.class, removed::getConnectorFunctionPackage); + } + +}
