This is an automated email from the ASF dual-hosted git repository.
dao-jun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 02cab7abea6 [improve][fn] make built-in connector reload incremental
(#25773)
02cab7abea6 is described below
commit 02cab7abea62740d9a415f16fc45bcb6af963175
Author: Dream95 <[email protected]>
AuthorDate: Fri May 15 19:39:29 2026 +0800
[improve][fn] make built-in connector reload incremental (#25773)
Signed-off-by: Dream95 <[email protected]>
---
.../org/apache/pulsar/common/nar/FileUtils.java | 31 +++++
.../org/apache/pulsar/common/nar/NarUnpacker.java | 33 +----
.../pulsar/functions/worker/ConnectorsManager.java | 28 +++-
.../ConnectorsManagerReloadConnectorsTest.java | 82 ++++++++++++
.../pulsar/functions/utils/io/Connector.java | 26 ++++
.../pulsar/functions/utils/io/ConnectorUtils.java | 84 +++++++++++-
.../functions/utils/io/ReloadConnectorsResult.java | 29 +++++
.../utils/io/ConnectorUtilsReloadTest.java | 141 +++++++++++++++++++++
8 files changed, 413 insertions(+), 41 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
index f231f95ced7..a613d9a55f4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
@@ -25,8 +25,11 @@
package org.apache.pulsar.common.nar;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collection;
import java.util.zip.ZipEntry;
@@ -43,6 +46,34 @@ public class FileUtils {
public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
+ /**
+ * Calculates an md5 sum of the specified file.
+ *
+ * @param file
+ * to calculate the md5sum of
+ * @return the md5sum bytes
+ * @throws IOException
+ * if cannot read file
+ */
+ public static byte[] calculateMd5sum(final File file) throws IOException {
+ try (final FileInputStream inputStream = new FileInputStream(file)) {
+ // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient
for this use case
+ final MessageDigest md5 = MessageDigest.getInstance("md5");
+
+ final byte[] buffer = new byte[1024];
+ int read = inputStream.read(buffer);
+
+ while (read > -1) {
+ md5.update(buffer, 0, read);
+ read = inputStream.read(buffer);
+ }
+
+ return md5.digest();
+ } catch (NoSuchAlgorithmException nsae) {
+ throw new IllegalArgumentException(nsae);
+ }
+ }
+
public static void ensureDirectoryExistAndCanReadAndWrite(final File dir)
throws IOException {
if (dir.exists() && !dir.isDirectory()) {
throw new IOException(dir.getAbsolutePath() + " is not a
directory");
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index 4da17e31834..6b71cc0018b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -26,7 +26,6 @@ package org.apache.pulsar.common.nar;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -36,8 +35,6 @@ import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
@@ -78,7 +75,7 @@ public class NarUnpacker {
throw new IOException("Cannot create " + parentDirectory);
}
}
- String md5Sum =
Base64.getUrlEncoder().withoutPadding().encodeToString(calculateMd5sum(nar));
+ String md5Sum =
Base64.getUrlEncoder().withoutPadding().encodeToString(FileUtils.calculateMd5sum(nar));
// ensure that one process can extract the files
File lockFile = new File(parentDirectory, "." + md5Sum + ".lock");
// prevent OverlappingFileLockException by ensuring that one thread
tries to create a lock in this JVM
@@ -177,32 +174,4 @@ public class NarUnpacker {
}
}
}
-
- /**
- * Calculates an md5 sum of the specified file.
- *
- * @param file
- * to calculate the md5sum of
- * @return the md5sum bytes
- * @throws IOException
- * if cannot read file
- */
- protected static byte[] calculateMd5sum(final File file) throws
IOException {
- try (final FileInputStream inputStream = new FileInputStream(file)) {
- // codeql[java/weak-cryptographic-algorithm] - md5 is sufficient
for this use case
- final MessageDigest md5 = MessageDigest.getInstance("md5");
-
- final byte[] buffer = new byte[1024];
- int read = inputStream.read(buffer);
-
- while (read > -1) {
- md5.update(buffer, 0, read);
- read = inputStream.read(buffer);
- }
-
- return md5.digest();
- } catch (NoSuchAlgorithmException nsae) {
- throw new IllegalArgumentException(nsae);
- }
- }
}
\ No newline at end of file
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
index a19220e77ab..79c56823c5b 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/ConnectorsManager.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
@@ -31,6 +32,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.ReloadConnectorsResult;
@CustomLog
public class ConnectorsManager implements AutoCloseable {
@@ -48,12 +50,16 @@ public class ConnectorsManager implements AutoCloseable {
}
private static TreeMap<String, Connector> createConnectors(WorkerConfig
workerConfig) throws IOException {
- boolean enableClassloading =
workerConfig.getEnableClassloadingOfBuiltinFiles()
- ||
ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ boolean enableClassloading = isEnableClassloading(workerConfig);
return
ConnectorUtils.searchForConnectors(workerConfig.getConnectorsDirectory(),
workerConfig.getNarExtractionDirectory(), enableClassloading);
}
+ private static boolean isEnableClassloading(WorkerConfig workerConfig) {
+ return workerConfig.getEnableClassloadingOfBuiltinFiles()
+ ||
ThreadRuntimeFactory.class.getName().equals(workerConfig.getFunctionRuntimeFactoryClassName());
+ }
+
@VisibleForTesting
public void addConnector(String connectorType, Connector connector) {
connectors.put(connectorType, connector);
@@ -89,9 +95,13 @@ public class ConnectorsManager implements AutoCloseable {
}
public void reloadConnectors(WorkerConfig workerConfig) throws IOException
{
- TreeMap<String, Connector> oldConnectors = connectors;
- this.connectors = createConnectors(workerConfig);
- closeConnectors(oldConnectors);
+ ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+ this.connectors,
+ workerConfig.getConnectorsDirectory(),
+ workerConfig.getNarExtractionDirectory(),
+ isEnableClassloading(workerConfig));
+ this.connectors = reload.connectors();
+ closeConnectors(reload.connectorsToClose());
}
@Override
@@ -99,14 +109,18 @@ public class ConnectorsManager implements AutoCloseable {
closeConnectors(connectors);
}
- private void closeConnectors(TreeMap<String, Connector> connectorMap) {
- connectorMap.values().forEach(connector -> {
+ private void closeConnectors(Collection<Connector> connectors) {
+ connectors.forEach(connector -> {
try {
connector.close();
} catch (Exception e) {
log.warn().exception(e).log("Failed to close connector");
}
});
+ }
+
+ private void closeConnectors(TreeMap<String, Connector> connectorMap) {
+ closeConnectors(connectorMap.values());
connectorMap.clear();
}
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java
new file mode 100644
index 00000000000..82c280f97ad
--- /dev/null
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/worker/ConnectorsManagerReloadConnectorsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import static org.testng.Assert.assertSame;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.utils.io.Connector;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link ConnectorsManager#reloadConnectors(WorkerConfig)} for
incremental reload behavior,
+ * ensuring unchanged connectors are reused instead of being recreated.
+ */
+public class ConnectorsManagerReloadConnectorsTest {
+
+ private static void writeMinimalNar(Path narPath, ConnectorDefinition def)
throws IOException {
+ byte[] yaml =
ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def);
+ try (OutputStream os = Files.newOutputStream(narPath);
+ ZipOutputStream zos = new ZipOutputStream(os)) {
+ ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml");
+ zos.putNextEntry(entry);
+ zos.write(yaml);
+ zos.closeEntry();
+ }
+ }
+
+ private static ConnectorDefinition sampleDefinition(String name) {
+ ConnectorDefinition def = new ConnectorDefinition();
+ def.setName(name);
+ def.setSinkClass("org.example.Sink");
+ def.setSourceClass("org.example.Source");
+ return def;
+ }
+
+ @Test
+ public void reloadWhenNarUnchangedReusesSameConnectorInstance() throws
Exception {
+ Path dir = Files.createTempDirectory("mgr-conn-reload-");
+ Path nar = dir.resolve("c1.nar");
+ writeMinimalNar(nar, sampleDefinition("c-one"));
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setConnectorsDirectory(dir.toString());
+
workerConfig.setNarExtractionDirectory(NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR);
+ workerConfig.setEnableClassloadingOfBuiltinFiles(false);
+
+ try (ConnectorsManager manager = new ConnectorsManager(workerConfig)) {
+ Connector before = manager.getConnector("c-one");
+ before.getConnectorFunctionPackage();
+
+ manager.reloadConnectors(workerConfig);
+
+ Connector after = manager.getConnector("c-one");
+ assertSame(after, before);
+ before.getConnectorFunctionPackage();
+ }
+ }
+
+}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
index 5fcc22747c5..bff477b40cd 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/Connector.java
@@ -28,6 +28,8 @@ import
org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
public class Connector implements AutoCloseable {
private final Path archivePath;
+ /** MD5 hex of archive file contents; empty when {@link #archivePath} is
null (test doubles). */
+ private final String archiveMd5Hex;
private final String narExtractionDirectory;
private final boolean enableClassloading;
private ValidatableFunctionPackage connectorFunctionPackage;
@@ -38,16 +40,40 @@ public class Connector implements AutoCloseable {
public Connector(Path archivePath, ConnectorDefinition
connectorDefinition, String narExtractionDirectory,
boolean enableClassloading) {
+ this(archivePath, connectorDefinition, narExtractionDirectory,
enableClassloading, null);
+ }
+
+ /**
+ * @param precomputedArchiveMd5Hex MD5 hex of {@code archivePath}
contents; if null and path is non-null,
+ * the hash is computed once at
construction time.
+ */
+ public Connector(Path archivePath, ConnectorDefinition
connectorDefinition, String narExtractionDirectory,
+ boolean enableClassloading, String
precomputedArchiveMd5Hex) {
this.archivePath = archivePath;
this.connectorDefinition = connectorDefinition;
this.narExtractionDirectory = narExtractionDirectory;
this.enableClassloading = enableClassloading;
+ if (archivePath != null) {
+ try {
+ this.archiveMd5Hex = precomputedArchiveMd5Hex != null
+ ? precomputedArchiveMd5Hex
+ : ConnectorUtils.computeArchiveMd5Hex(archivePath);
+ } catch (java.io.IOException e) {
+ throw new java.io.UncheckedIOException(e);
+ }
+ } else {
+ this.archiveMd5Hex = "";
+ }
}
public Path getArchivePath() {
return archivePath;
}
+ public String getArchiveMd5Hex() {
+ return archiveMd5Hex;
+ }
+
public synchronized ValidatableFunctionPackage
getConnectorFunctionPackage() {
checkState();
if (connectorFunctionPackage == null) {
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 9de02a7039a..7595a17b48c 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -24,7 +24,9 @@ import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HexFormat;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -39,6 +41,7 @@ import net.bytebuddy.description.type.TypeDefinition;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
@@ -52,7 +55,16 @@ import org.apache.pulsar.io.core.annotations.FieldDoc;
@CustomLog
public class ConnectorUtils {
- private static final String PULSAR_IO_SERVICE_NAME = "pulsar-io.yaml";
+ /**
+ * Computes MD5 digest of a file as lower-case hex (for connector archive
identity on reload).
+ */
+ public static String computeArchiveMd5Hex(Path path) throws IOException {
+ return calculateMd5Hex(path.toAbsolutePath().normalize().toFile());
+ }
+
+ private static String calculateMd5Hex(File file) throws IOException {
+ return HexFormat.of().formatHex(FileUtils.calculateMd5sum(file));
+ }
/**
* Extract the Pulsar IO Source class from a connector archive.
@@ -174,7 +186,8 @@ public class ConnectorUtils {
.attr("connector", cntDef)
.attr("archive", archive)
.log("Found connector");
- Connector connector = new Connector(archive, cntDef,
narExtractionDirectory, enableClassloading);
+ Connector connector = new Connector(archive, cntDef,
narExtractionDirectory,
+ enableClassloading);
connectors.put(cntDef.getName(), connector);
} catch (Throwable t) {
log.warn()
@@ -186,4 +199,71 @@ public class ConnectorUtils {
}
return connectors;
}
+
+ /**
+ * Reloads connectors from disk against {@code previous}, reusing {@link
Connector} instances when path and
+ * archive MD5 are unchanged (keeps class loaders open). New or changed
archives get new instances.
+ * <p>
+ * {@link ReloadConnectorsResult#connectorsToClose()} lists connectors
evicted from the active set (replaced or
+ * no longer present on disk); the caller must {@link Connector#close()}
each (typically via
+ * {@code ConnectorsManager}).
+ *
+ * @param previous connectors from the previous scan (may
be empty, never null)
+ * @param connectorsDirectory same semantics as {@link
#searchForConnectors}
+ * @param narExtractionDirectory same semantics as {@link
#searchForConnectors}
+ * @param enableClassloading same semantics as {@link
#searchForConnectors}
+ * @return new map keyed by connector name (reused values are identical
instances from {@code previous}) and
+ * connectors the caller should close
+ */
+ public static ReloadConnectorsResult reloadConnectors(
+ TreeMap<String, Connector> previous,
+ String connectorsDirectory,
+ String narExtractionDirectory,
+ boolean enableClassloading) throws IOException {
+
+ TreeMap<String, Connector> remaining = new TreeMap<>(previous);
+ TreeMap<String, Connector> next = new TreeMap<>();
+ List<Connector> toClose = new ArrayList<>();
+
+ Path dir = Paths.get(connectorsDirectory).toAbsolutePath().normalize();
+ if (!dir.toFile().exists()) {
+ toClose.addAll(remaining.values());
+ return new ReloadConnectorsResult(next, toClose);
+ }
+
+ try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir,
"*.nar")) {
+ for (Path archive : stream) {
+ try {
+ ConnectorDefinition cntDef =
ConnectorUtils.getConnectorDefinition(archive.toFile());
+ String name = cntDef.getName();
+ String md5Hex = computeArchiveMd5Hex(archive);
+ Connector prev = remaining.remove(name);
+ if (prev != null
+ && prev.getArchivePath() != null
+ && archive.equals(prev.getArchivePath())
+ && md5Hex.equals(prev.getArchiveMd5Hex())) {
+ next.put(name, prev);
+ } else {
+ if (prev != null) {
+ log.info()
+ .attr("connector", name)
+ .attr("archive", archive)
+ .attr("previousArchive",
prev.getArchivePath())
+ .log("Reloading changed connector");
+ toClose.add(prev);
+ }
+ next.put(name, new Connector(archive, cntDef,
narExtractionDirectory, enableClassloading,
+ md5Hex));
+ }
+ } catch (Throwable t) {
+ log.warn()
+ .attr("archive", archive)
+ .exception(t)
+ .log("Failed to load connector");
+ }
+ }
+ }
+ toClose.addAll(remaining.values());
+ return new ReloadConnectorsResult(next, toClose);
+ }
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
new file mode 100644
index 00000000000..cdfab35692c
--- /dev/null
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ReloadConnectorsResult.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.io;
+
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * Result of {@link ConnectorUtils#reloadConnectors}: the new connector map
and connectors evicted from the active set
+ * that the caller must close.
+ */
+public record ReloadConnectorsResult(TreeMap<String, Connector> connectors,
List<Connector> connectorsToClose) {
+}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
new file mode 100644
index 00000000000..07b6459369a
--- /dev/null
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/io/ConnectorUtilsReloadTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils.io;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotSame;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.TreeMap;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.testng.annotations.Test;
+
+@Test
+public class ConnectorUtilsReloadTest {
+
+ private static void closeEvicted(ReloadConnectorsResult reload) throws
Exception {
+ for (Connector c : reload.connectorsToClose()) {
+ c.close();
+ }
+ }
+
+ private static void writeMinimalNar(Path narPath, ConnectorDefinition def)
throws IOException {
+ byte[] yaml =
ObjectMapperFactory.getYamlMapper().getObjectMapper().writeValueAsBytes(def);
+ try (OutputStream os = Files.newOutputStream(narPath);
+ ZipOutputStream zos = new ZipOutputStream(os)) {
+ ZipEntry entry = new ZipEntry("META-INF/services/pulsar-io.yaml");
+ zos.putNextEntry(entry);
+ zos.write(yaml);
+ zos.closeEntry();
+ }
+ }
+
+ private static ConnectorDefinition sampleDefinition(String name) {
+ ConnectorDefinition def = new ConnectorDefinition();
+ def.setName(name);
+ def.setSinkClass("org.example.Sink");
+ def.setSourceClass("org.example.Source");
+ return def;
+ }
+
+ /**
+ * Historical {@code ConnectorsManager} reload replaced the whole map and
closed every prior
+ * {@link Connector}, even when NAR files were unchanged. A caller keeping
a reference to the
+ * pre-reload connector would then hit {@link IllegalStateException} on
lazy use.
+ * <p>
+ * Incremental reload must evict nothing, reuse the same instance, and
leave that instance usable
+ * after the caller closes only {@link
ReloadConnectorsResult#connectorsToClose()}.
+ */
+ @Test
+ public void reloadUnchangedNarEvictsNothingAndKeepsSameConnectorUsable()
throws Exception {
+ Path dir = Files.createTempDirectory("conn-reload-");
+ Path nar = dir.resolve("c1.nar");
+ writeMinimalNar(nar, sampleDefinition("c-one"));
+
+ TreeMap<String, Connector> first =
+ ConnectorUtils.searchForConnectors(dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+ Connector c1 = first.get("c-one");
+ c1.getConnectorFunctionPackage();
+
+ ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+ first, dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+ assertTrue(reload.connectorsToClose().isEmpty());
+ closeEvicted(reload);
+ TreeMap<String, Connector> second = reload.connectors();
+
+ assertSame(second.get("c-one"), c1);
+ c1.getConnectorFunctionPackage();
+ }
+
+ @Test
+ public void reloadReopensConnectorWhenNarContentChanges() throws Exception
{
+ Path dir = Files.createTempDirectory("conn-reload-");
+ Path nar = dir.resolve("c1.nar");
+ writeMinimalNar(nar, sampleDefinition("c-one"));
+
+ TreeMap<String, Connector> first =
+ ConnectorUtils.searchForConnectors(dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+ Connector before = first.get("c-one");
+
+ ConnectorDefinition updated = sampleDefinition("c-one");
+ updated.setDescription("changed");
+ writeMinimalNar(nar, updated);
+
+ ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+ first, dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+ closeEvicted(reload);
+ TreeMap<String, Connector> second = reload.connectors();
+
+ assertNotSame(second.get("c-one"), before);
+ assertThrows(IllegalStateException.class,
before::getConnectorFunctionPackage);
+ }
+
+ @Test
+ public void reloadClosesConnectorsRemovedFromDirectory() throws Exception {
+ Path dir = Files.createTempDirectory("conn-reload-");
+ Path nar1 = dir.resolve("a.nar");
+ Path nar2 = dir.resolve("b.nar");
+ writeMinimalNar(nar1, sampleDefinition("conn-a"));
+ writeMinimalNar(nar2, sampleDefinition("conn-b"));
+
+ TreeMap<String, Connector> first =
+ ConnectorUtils.searchForConnectors(dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+ Connector removed = first.get("conn-b");
+ Files.delete(nar2);
+
+ ReloadConnectorsResult reload = ConnectorUtils.reloadConnectors(
+ first, dir.toString(),
NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, false);
+ closeEvicted(reload);
+ TreeMap<String, Connector> second = reload.connectors();
+
+ assertEquals(second.size(), 1);
+ assertSame(second.get("conn-a"), first.get("conn-a"));
+ assertThrows(IllegalStateException.class,
removed::getConnectorFunctionPackage);
+ }
+
+}