This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0ef1394a604 MINOR: Add plugin.path config non-empty check in
connect-plugin-path.sh (#20638)
0ef1394a604 is described below
commit 0ef1394a604a36724e4d097f00164933ec2433de
Author: majialong <[email protected]>
AuthorDate: Fri Nov 14 18:02:36 2025 +0800
MINOR: Add plugin.path config non-empty check in connect-plugin-path.sh
(#20638)
[KAFKA-19112](https://github.com/apache/kafka/pull/20334) defines the
`plugin.path` config as non-empty. This PR add validation for
`plugin.path` related config in the `connect-plugin-path.sh` tool to
satisfy the same non-empty semantics, addressing the issue [mentioned
here](https://github.com/apache/kafka/pull/20334/files#r2395222944).
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../connect/runtime/isolation/PluginUtils.java | 3 --
.../org/apache/kafka/tools/ConnectPluginPath.java | 25 +++++++++-
.../apache/kafka/tools/ConnectPluginPathTest.java | 57 ++++++++++++++++++++++
3 files changed, 81 insertions(+), 4 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 729074d508e..ac7deaa4b35 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -209,9 +209,6 @@ public class PluginUtils {
for (String path : pluginPathElements) {
try {
Path pluginPathElement = Paths.get(path).toAbsolutePath();
- if (pluginPath.isEmpty()) {
- log.warn("Plugin path element is empty, evaluating to
{}.", pluginPathElement);
- }
if (!Files.exists(pluginPathElement)) {
throw new
FileNotFoundException(pluginPathElement.toString());
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
index a3da1d3a9c6..73987b6746c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.tools;
+import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -52,6 +53,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -68,6 +70,8 @@ public class ConnectPluginPath {
};
public static final String NO_ALIAS = "N/A";
+ private static final Pattern COMMA_WITH_WHITESPACE =
Pattern.compile("\\s*,\\s*");
+
public static void main(String[] args) {
Exit.exit(mainNoExit(args, System.out, System.err));
}
@@ -82,7 +86,7 @@ public class ConnectPluginPath {
} catch (ArgumentParserException e) {
parser.handleError(e);
return 1;
- } catch (TerseException e) {
+ } catch (TerseException | ConfigException e) {
err.println(e.getMessage());
return 2;
} catch (Throwable e) {
@@ -162,6 +166,9 @@ public class ConnectPluginPath {
if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() &&
rawWorkerConfigs.isEmpty()) {
throw new ArgumentParserException("Must specify at least one
--plugin-location, --plugin-path, or --worker-config", parser);
}
+ for (String pluginPath : rawPluginPaths) {
+ validatePluginPath(pluginPath, "--plugin-path");
+ }
Set<Path> pluginLocations = new LinkedHashSet<>();
for (String rawWorkerConfig : rawWorkerConfigs) {
Properties properties;
@@ -172,6 +179,7 @@ public class ConnectPluginPath {
}
String pluginPath =
properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG);
if (pluginPath != null) {
+ validatePluginPath(pluginPath,
WorkerConfig.PLUGIN_PATH_CONFIG);
rawPluginPaths.add(pluginPath);
}
}
@@ -192,6 +200,21 @@ public class ConnectPluginPath {
return pluginLocations;
}
+ private static void validatePluginPath(String pluginPath, String
configName) throws ConfigException {
+ String trimmed = pluginPath.trim();
+ if (trimmed.isEmpty()) {
+ throw new ConfigException("'" + configName + "' must not be
empty.");
+ }
+
+ String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(trimmed, -1);
+
+ for (String path : pluginPathElements) {
+ if (path.isEmpty()) {
+ throw new ConfigException("'" + configName + "' values must
not be empty.");
+ }
+ }
+ }
+
enum Command {
LIST, SYNC_MANIFESTS
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
index d10cf7b2e45..aff3f734ba7 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
@@ -333,6 +333,63 @@ public class ConnectPluginPathTest {
assertBadPackagingPluginsStatus(table, false);
}
+ @Test
+ public void testListEmptyPluginPathArg() {
+ CommandResult res = runCommand(
+ "list",
+ "--plugin-path",
+ ""
+ );
+ assertNotEquals(0, res.returnCode);
+ assertEquals("'--plugin-path' must not be empty.\n", res.err);
+ }
+
+ @Test
+ public void testListEmptyPluginPathElementArg() {
+ CommandResult res = runCommand(
+ "list",
+ "--plugin-path",
+ "location-a,,location-b"
+ );
+ assertNotEquals(0, res.returnCode);
+ assertEquals("'--plugin-path' values must not be empty.\n", res.err);
+ }
+
+ @Test
+ public void testListEmptyPluginPathInWorkerConfig() {
+ Path configPath = workspace.resolve("worker-empty.properties");
+ try {
+ Files.writeString(configPath, "plugin.path=",
StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ fail("Failed to create test worker config: " + e.getMessage());
+ }
+
+ CommandResult res = runCommand(
+ "list",
+ "--worker-config",
+ configPath.toString()
+ );
+ assertNotEquals(0, res.returnCode);
+ assertEquals("'plugin.path' must not be empty.\n", res.err);
+ }
+
+ @Test
+ public void testListEmptyPluginPathElementInWorkerConfig() {
+ Path configPath = workspace.resolve("worker-empty-element.properties");
+ try {
+ Files.writeString(configPath,
"plugin.path=location-a,,location-b", StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ fail("Failed to create test worker config: " + e.getMessage());
+ }
+
+ CommandResult res = runCommand(
+ "list",
+ "--worker-config",
+ configPath.toString()
+ );
+ assertNotEquals(0, res.returnCode);
+ assertEquals("'plugin.path' values must not be empty.\n", res.err);
+ }
private static Map<String, List<String[]>> assertListSuccess(CommandResult
result) {
assertEquals(0, result.returnCode);