This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dcb76b9417 [Fix][Plugin] Optimize the plugin discovery mechanism
(#8603)
dcb76b9417 is described below
commit dcb76b94178e28a4fb8f59fa056773520dc77924
Author: corgy-w <[email protected]>
AuthorDate: Tue Feb 25 13:58:39 2025 +0800
[Fix][Plugin] Optimize the plugin discovery mechanism (#8603)
---
.../plugin/discovery/AbstractPluginDiscovery.java | 160 ++++++++-------------
.../SeaTunnelSourcePluginDiscoveryTest.java | 63 +++++++-
.../duplicate/connectors/plugin-mapping.properties | 16 ++-
3 files changed, 133 insertions(+), 106 deletions(-)
diff --git
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index a946700c75..0bc777be0e 100644
---
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -36,6 +36,7 @@ import
org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@@ -44,7 +45,6 @@ import org.apache.commons.lang3.tuple.ImmutableTriple;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
@@ -55,13 +55,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -90,6 +88,9 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>>
pluginJarPath =
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+ protected final Map<PluginIdentifier, String> sourcePluginInstance;
+ protected final Map<PluginIdentifier, String> sinkPluginInstance;
+ protected final Map<PluginIdentifier, String> transformPluginInstance;
public AbstractPluginDiscovery(BiConsumer<ClassLoader, URL>
addURLToClassloader) {
this(Common.connectorDir(), loadConnectorPluginConfig(),
addURLToClassloader);
@@ -114,6 +115,9 @@ public abstract class AbstractPluginDiscovery<T> implements
PluginDiscovery<T> {
this.pluginDir = pluginDir;
this.pluginMappingConfig = pluginMappingConfig;
this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
+ this.sourcePluginInstance = getAllSupportedPlugins(PluginType.SOURCE);
+ this.sinkPluginInstance = getAllSupportedPlugins(PluginType.SINK);
+ this.transformPluginInstance =
getAllSupportedPlugins(PluginType.TRANSFORM);
log.info("Load {} Plugin from {}",
getPluginBaseClass().getSimpleName(), pluginDir);
}
@@ -423,14 +427,10 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
pluginDir
.toFile()
.listFiles(
- new FileFilter() {
- @Override
- public boolean accept(File pathname) {
- return
pathname.getName().endsWith(".jar")
+ pathname ->
+ pathname.getName().endsWith(".jar")
&&
StringUtils.startsWithIgnoreCase(
- pathname.getName(),
pluginJarPrefix);
- }
- });
+ pathname.getName(),
pluginJarPrefix));
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
@@ -439,10 +439,9 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
if (targetPluginFiles.length == 1) {
pluginJarPath = targetPluginFiles[0].toURI().toURL();
} else {
+ PluginType type = PluginType.valueOf(pluginType.toUpperCase());
pluginJarPath =
- findMostSimlarPluginJarFile(targetPluginFiles,
pluginJarPrefix)
- .toURI()
- .toURL();
+ selectPluginJar(targetPluginFiles, pluginJarPrefix,
pluginName, type).get();
}
log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier,
pluginJarPath);
return Optional.of(pluginJarPath);
@@ -455,104 +454,59 @@ public abstract class AbstractPluginDiscovery<T>
implements PluginDiscovery<T> {
}
}
- private static File findMostSimlarPluginJarFile(
- File[] targetPluginFiles, String pluginJarPrefix) {
- String splitRegex = "\\-|\\_|\\.";
- double maxSimlarity = -Integer.MAX_VALUE;
- int mostSimlarPluginJarFileIndex = -1;
- for (int i = 0; i < targetPluginFiles.length; i++) {
- File file = targetPluginFiles[i];
- String fileName = file.getName();
- double similarity =
- CosineSimilarityUtil.cosineSimilarity(pluginJarPrefix,
fileName, splitRegex);
- if (similarity > maxSimlarity) {
- maxSimlarity = similarity;
- mostSimlarPluginJarFileIndex = i;
- }
+ private Optional<URL> selectPluginJar(
+ File[] targetPluginFiles, String pluginJarPrefix, String
pluginName, PluginType type) {
+ List<URL> resMatchedUrls = new ArrayList<>();
+ for (File file : targetPluginFiles) {
+ Optional<URL> matchedUrl = findMatchingUrl(file, type);
+ matchedUrl.ifPresent(resMatchedUrls::add);
+ }
+ if (resMatchedUrls.size() != 1) {
+ throw new SeaTunnelException(
+ String.format(
+ "Cannot find unique plugin jar for
pluginIdentifier: %s -> %s. Possible impact jar: %s",
+ pluginName, pluginJarPrefix,
Arrays.asList(targetPluginFiles)));
+ } else {
+ return Optional.of(resMatchedUrls.get(0));
}
- return targetPluginFiles[mostSimlarPluginJarFileIndex];
}
- static class CosineSimilarityUtil {
- public static double cosineSimilarity(String textA, String textB,
String splitRegrex) {
- Set<String> words1 =
- new
HashSet<>(Arrays.asList(textA.toLowerCase().split(splitRegrex)));
- Set<String> words2 =
- new
HashSet<>(Arrays.asList(textB.toLowerCase().split(splitRegrex)));
- int[] termFrequency1 = calculateTermFrequencyVector(textA, words1,
splitRegrex);
- int[] termFrequency2 = calculateTermFrequencyVector(textB, words2,
splitRegrex);
- return calculateCosineSimilarity(termFrequency1, termFrequency2);
+ private Optional<URL> findMatchingUrl(File file, PluginType type) {
+ Map<PluginIdentifier, String> pluginInstanceMap = null;
+ switch (type) {
+ case SINK:
+ pluginInstanceMap = sinkPluginInstance;
+ break;
+ case SOURCE:
+ pluginInstanceMap = sourcePluginInstance;
+ break;
+ case TRANSFORM:
+ pluginInstanceMap = transformPluginInstance;
+ break;
}
-
- private static int[] calculateTermFrequencyVector(
- String text, Set<String> words, String splitRegrex) {
- int[] termFrequencyVector = new int[words.size()];
- String[] textArray = text.toLowerCase().split(splitRegrex);
- List<String> orderedWords = new ArrayList<String>();
- words.clear();
- for (String word : textArray) {
- if (!words.contains(word)) {
- orderedWords.add(word);
- words.add(word);
- }
- }
- for (String word : textArray) {
- if (words.contains(word)) {
- int index = 0;
- for (String w : orderedWords) {
- if (w.equals(word)) {
- termFrequencyVector[index]++;
- break;
- }
- index++;
- }
- }
- }
- return termFrequencyVector;
+ if (pluginInstanceMap == null) {
+ return Optional.empty();
}
-
- private static double calculateCosineSimilarity(int[] vectorA, int[]
vectorB) {
- double dotProduct = 0.0;
- double magnitudeA = 0.0;
- double magnitudeB = 0.0;
- int vectorALength = vectorA.length;
- int vectorBLength = vectorB.length;
- if (vectorALength < vectorBLength) {
- int[] vectorTemp = new int[vectorBLength];
- for (int i = 0; i < vectorB.length; i++) {
- if (i <= vectorALength - 1) {
- vectorTemp[i] = vectorA[i];
- } else {
- vectorTemp[i] = 0;
- }
- }
- vectorA = vectorTemp;
- }
- if (vectorALength > vectorBLength) {
- int[] vectorTemp = new int[vectorALength];
- for (int i = 0; i < vectorA.length; i++) {
- if (i <= vectorBLength - 1) {
- vectorTemp[i] = vectorB[i];
- } else {
- vectorTemp[i] = 0;
- }
- }
- vectorB = vectorTemp;
- }
- for (int i = 0; i < vectorA.length; i++) {
- dotProduct += vectorA[i] * vectorB[i];
- magnitudeA += Math.pow(vectorA[i], 2);
- magnitudeB += Math.pow(vectorB[i], 2);
+ List<PluginIdentifier> matchedIdentifier = new ArrayList<>();
+ for (Map.Entry<PluginIdentifier, String> entry :
pluginInstanceMap.entrySet()) {
+ if (file.getName().startsWith(entry.getValue())) {
+ matchedIdentifier.add(entry.getKey());
}
+ }
- magnitudeA = Math.sqrt(magnitudeA);
- magnitudeB = Math.sqrt(magnitudeB);
-
- if (magnitudeA == 0 || magnitudeB == 0) {
- return 0.0; // Avoid dividing by 0
- } else {
- return dotProduct / (magnitudeA * magnitudeB);
+ if (matchedIdentifier.size() == 1) {
+ try {
+ return Optional.of(file.toURI().toURL());
+ } catch (MalformedURLException e) {
+ log.warn("Cannot get plugin URL for pluginIdentifier: {}",
file, e);
}
}
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "File found: {}, matches more than one PluginIdentifier:
{}",
+ file.getName(),
+ matchedIdentifier);
+ }
+ return Optional.empty();
}
}
diff --git
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index e8511d524b..023a508e02 100644
---
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -53,7 +54,13 @@ class SeaTunnelSourcePluginDiscoveryTest {
Paths.get(seatunnelHome, "connectors",
"connector-http.jar"),
Paths.get(seatunnelHome, "connectors",
"connector-kafka.jar"),
Paths.get(seatunnelHome, "connectors",
"connector-kafka-alcs.jar"),
- Paths.get(seatunnelHome, "connectors",
"connector-kafka-blcs.jar"));
+ Paths.get(seatunnelHome, "connectors",
"connector-kafka-blcs.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-jdbc-release-1.1.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-jdbc-hive1.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-odbc-baidu-v1.jar"),
+ Paths.get(seatunnelHome, "connectors",
"connector-odbc-baidu-release-1.1.jar"),
+ Paths.get(seatunnelHome, "connectors",
"seatunnel-transforms-v2.jar"),
+ Paths.get(seatunnelHome, "connectors",
"seatunnel-transforms-v1.jar"));
@BeforeEach
public void before() throws IOException {
@@ -75,7 +82,8 @@ class SeaTunnelSourcePluginDiscoveryTest {
PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "HttpJira"),
PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "HttpBase"),
PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "Kafka"),
- PluginIdentifier.of("seatunnel",
PluginType.SINK.getType(), "Kafka-Blcs"));
+ PluginIdentifier.of("seatunnel",
PluginType.SINK.getType(), "Kafka-Blcs"),
+ PluginIdentifier.of("seatunnel",
PluginType.SINK.getType(), "Jdbc"));
SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
new SeaTunnelSourcePluginDiscovery();
Assertions.assertIterableEquals(
@@ -87,6 +95,57 @@ class SeaTunnelSourcePluginDiscoveryTest {
Paths.get(seatunnelHome, "connectors",
"connector-kafka.jar")
.toString(),
Paths.get(seatunnelHome, "connectors",
"connector-kafka-blcs.jar")
+ .toString(),
+ Paths.get(
+ seatunnelHome,
+ "connectors",
+
"connector-jdbc-release-1.1.jar")
+ .toString())
+ .collect(Collectors.toList()),
+
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
+ .map(URL::getPath)
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ void getPluginBaseClassFailureScenario() {
+ List<PluginIdentifier> pluginIdentifiers =
+ Lists.newArrayList(
+ PluginIdentifier.of("seatunnel",
PluginType.SOURCE.getType(), "Odbc"));
+ SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+ new SeaTunnelSourcePluginDiscovery();
+ Exception exception =
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () ->
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers));
+ System.out.println(exception.getMessage());
+ Assertions.assertTrue(
+ exception
+ .getMessage()
+ .matches(
+ "Cannot find unique plugin jar for
pluginIdentifier: odbc -> connector-odbc. "
+ + "Possible impact jar: \\[.*.jar,
.*.jar]"));
+ }
+
+ @Test
+ void getTransformClass() {
+ List<PluginIdentifier> pluginIdentifiers =
+ Lists.newArrayList(
+ PluginIdentifier.of("seatunnel",
PluginType.TRANSFORM.getType(), "Sql"),
+ PluginIdentifier.of("seatunnel",
PluginType.TRANSFORM.getType(), "Filter"));
+ SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+ new SeaTunnelSourcePluginDiscovery();
+ Assertions.assertIterableEquals(
+ Stream.of(
+ Paths.get(
+ seatunnelHome,
+ "connectors",
+ "seatunnel-transforms-v2.jar")
+ .toString(),
+ Paths.get(
+ seatunnelHome,
+ "connectors",
+ "seatunnel-transforms-v1.jar")
.toString())
.collect(Collectors.toList()),
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
diff --git
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
index ea20ad05b0..0f75af0460 100644
---
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
+++
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
@@ -24,4 +24,18 @@ seatunnel.sink.Kafka = connector-kafka
seatunnel.source.Kafka-Alcs = connector-kafka-alcs
seatunnel.sink.Kafka-Alcs = connector-kafka-alcs
seatunnel.source.Kafka-Blcs = connector-kafka-blcs
-seatunnel.sink.Kafka-Blcs = connector-kafka-blcs
\ No newline at end of file
+seatunnel.sink.Kafka-Blcs = connector-kafka-blcs
+seatunnel.source.Jdbc = connector-jdbc
+seatunnel.sink.Jdbc = connector-jdbc
+seatunnel.source.Hive1-Jdbc = connector-jdbc-hive1
+seatunnel.sink.Hive1-Jdbc = connector-jdbc-hive1
+seatunnel.source.Odbc = connector-odbc
+seatunnel.sink.Odbc = connector-odbc
+seatunnel.source.Baidu-Odbc = connector-odbc-baidu
+seatunnel.sink.Baidu-Odbc = connector-odbc-baidu
+
+seatunnel.transform.Sql = seatunnel-transforms-v2
+seatunnel.transform.FieldMapper = seatunnel-transforms-v2
+seatunnel.transform.Filter = seatunnel-transforms-v1
+seatunnel.transform.FilterRowKind = seatunnel-transforms-v1
+