This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dcb76b9417 [Fix][Plugin] Optimize the plugin discovery mechanism 
(#8603)
dcb76b9417 is described below

commit dcb76b94178e28a4fb8f59fa056773520dc77924
Author: corgy-w <[email protected]>
AuthorDate: Tue Feb 25 13:58:39 2025 +0800

    [Fix][Plugin] Optimize the plugin discovery mechanism (#8603)
---
 .../plugin/discovery/AbstractPluginDiscovery.java  | 160 ++++++++-------------
 .../SeaTunnelSourcePluginDiscoveryTest.java        |  63 +++++++-
 .../duplicate/connectors/plugin-mapping.properties |  16 ++-
 3 files changed, 133 insertions(+), 106 deletions(-)

diff --git 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index a946700c75..0bc777be0e 100644
--- 
a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ 
b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -36,6 +36,7 @@ import 
org.apache.seatunnel.common.constants.CollectionConstants;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -44,7 +45,6 @@ import org.apache.commons.lang3.tuple.ImmutableTriple;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -55,13 +55,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.ServiceLoader;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -90,6 +88,9 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
     private final BiConsumer<ClassLoader, URL> addURLToClassLoaderConsumer;
     protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> 
pluginJarPath =
             new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+    protected final Map<PluginIdentifier, String> sourcePluginInstance;
+    protected final Map<PluginIdentifier, String> sinkPluginInstance;
+    protected final Map<PluginIdentifier, String> transformPluginInstance;
 
     public AbstractPluginDiscovery(BiConsumer<ClassLoader, URL> 
addURLToClassloader) {
         this(Common.connectorDir(), loadConnectorPluginConfig(), 
addURLToClassloader);
@@ -114,6 +115,9 @@ public abstract class AbstractPluginDiscovery<T> implements 
PluginDiscovery<T> {
         this.pluginDir = pluginDir;
         this.pluginMappingConfig = pluginMappingConfig;
         this.addURLToClassLoaderConsumer = addURLToClassLoaderConsumer;
+        this.sourcePluginInstance = getAllSupportedPlugins(PluginType.SOURCE);
+        this.sinkPluginInstance = getAllSupportedPlugins(PluginType.SINK);
+        this.transformPluginInstance = 
getAllSupportedPlugins(PluginType.TRANSFORM);
         log.info("Load {} Plugin from {}", 
getPluginBaseClass().getSimpleName(), pluginDir);
     }
 
@@ -423,14 +427,10 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
                 pluginDir
                         .toFile()
                         .listFiles(
-                                new FileFilter() {
-                                    @Override
-                                    public boolean accept(File pathname) {
-                                        return 
pathname.getName().endsWith(".jar")
+                                pathname ->
+                                        pathname.getName().endsWith(".jar")
                                                 && 
StringUtils.startsWithIgnoreCase(
-                                                        pathname.getName(), 
pluginJarPrefix);
-                                    }
-                                });
+                                                        pathname.getName(), 
pluginJarPrefix));
         if (ArrayUtils.isEmpty(targetPluginFiles)) {
             return Optional.empty();
         }
@@ -439,10 +439,9 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
             if (targetPluginFiles.length == 1) {
                 pluginJarPath = targetPluginFiles[0].toURI().toURL();
             } else {
+                PluginType type = PluginType.valueOf(pluginType.toUpperCase());
                 pluginJarPath =
-                        findMostSimlarPluginJarFile(targetPluginFiles, 
pluginJarPrefix)
-                                .toURI()
-                                .toURL();
+                        selectPluginJar(targetPluginFiles, pluginJarPrefix, 
pluginName, type).get();
             }
             log.info("Discovery plugin jar for: {} at: {}", pluginIdentifier, 
pluginJarPath);
             return Optional.of(pluginJarPath);
@@ -455,104 +454,59 @@ public abstract class AbstractPluginDiscovery<T> 
implements PluginDiscovery<T> {
         }
     }
 
-    private static File findMostSimlarPluginJarFile(
-            File[] targetPluginFiles, String pluginJarPrefix) {
-        String splitRegex = "\\-|\\_|\\.";
-        double maxSimlarity = -Integer.MAX_VALUE;
-        int mostSimlarPluginJarFileIndex = -1;
-        for (int i = 0; i < targetPluginFiles.length; i++) {
-            File file = targetPluginFiles[i];
-            String fileName = file.getName();
-            double similarity =
-                    CosineSimilarityUtil.cosineSimilarity(pluginJarPrefix, 
fileName, splitRegex);
-            if (similarity > maxSimlarity) {
-                maxSimlarity = similarity;
-                mostSimlarPluginJarFileIndex = i;
-            }
+    private Optional<URL> selectPluginJar(
+            File[] targetPluginFiles, String pluginJarPrefix, String 
pluginName, PluginType type) {
+        List<URL> resMatchedUrls = new ArrayList<>();
+        for (File file : targetPluginFiles) {
+            Optional<URL> matchedUrl = findMatchingUrl(file, type);
+            matchedUrl.ifPresent(resMatchedUrls::add);
+        }
+        if (resMatchedUrls.size() != 1) {
+            throw new SeaTunnelException(
+                    String.format(
+                            "Cannot find unique plugin jar for 
pluginIdentifier: %s -> %s. Possible impact jar: %s",
+                            pluginName, pluginJarPrefix, 
Arrays.asList(targetPluginFiles)));
+        } else {
+            return Optional.of(resMatchedUrls.get(0));
         }
-        return targetPluginFiles[mostSimlarPluginJarFileIndex];
     }
 
-    static class CosineSimilarityUtil {
-        public static double cosineSimilarity(String textA, String textB, 
String splitRegrex) {
-            Set<String> words1 =
-                    new 
HashSet<>(Arrays.asList(textA.toLowerCase().split(splitRegrex)));
-            Set<String> words2 =
-                    new 
HashSet<>(Arrays.asList(textB.toLowerCase().split(splitRegrex)));
-            int[] termFrequency1 = calculateTermFrequencyVector(textA, words1, 
splitRegrex);
-            int[] termFrequency2 = calculateTermFrequencyVector(textB, words2, 
splitRegrex);
-            return calculateCosineSimilarity(termFrequency1, termFrequency2);
+    private Optional<URL> findMatchingUrl(File file, PluginType type) {
+        Map<PluginIdentifier, String> pluginInstanceMap = null;
+        switch (type) {
+            case SINK:
+                pluginInstanceMap = sinkPluginInstance;
+                break;
+            case SOURCE:
+                pluginInstanceMap = sourcePluginInstance;
+                break;
+            case TRANSFORM:
+                pluginInstanceMap = transformPluginInstance;
+                break;
         }
-
-        private static int[] calculateTermFrequencyVector(
-                String text, Set<String> words, String splitRegrex) {
-            int[] termFrequencyVector = new int[words.size()];
-            String[] textArray = text.toLowerCase().split(splitRegrex);
-            List<String> orderedWords = new ArrayList<String>();
-            words.clear();
-            for (String word : textArray) {
-                if (!words.contains(word)) {
-                    orderedWords.add(word);
-                    words.add(word);
-                }
-            }
-            for (String word : textArray) {
-                if (words.contains(word)) {
-                    int index = 0;
-                    for (String w : orderedWords) {
-                        if (w.equals(word)) {
-                            termFrequencyVector[index]++;
-                            break;
-                        }
-                        index++;
-                    }
-                }
-            }
-            return termFrequencyVector;
+        if (pluginInstanceMap == null) {
+            return Optional.empty();
         }
-
-        private static double calculateCosineSimilarity(int[] vectorA, int[] 
vectorB) {
-            double dotProduct = 0.0;
-            double magnitudeA = 0.0;
-            double magnitudeB = 0.0;
-            int vectorALength = vectorA.length;
-            int vectorBLength = vectorB.length;
-            if (vectorALength < vectorBLength) {
-                int[] vectorTemp = new int[vectorBLength];
-                for (int i = 0; i < vectorB.length; i++) {
-                    if (i <= vectorALength - 1) {
-                        vectorTemp[i] = vectorA[i];
-                    } else {
-                        vectorTemp[i] = 0;
-                    }
-                }
-                vectorA = vectorTemp;
-            }
-            if (vectorALength > vectorBLength) {
-                int[] vectorTemp = new int[vectorALength];
-                for (int i = 0; i < vectorA.length; i++) {
-                    if (i <= vectorBLength - 1) {
-                        vectorTemp[i] = vectorB[i];
-                    } else {
-                        vectorTemp[i] = 0;
-                    }
-                }
-                vectorB = vectorTemp;
-            }
-            for (int i = 0; i < vectorA.length; i++) {
-                dotProduct += vectorA[i] * vectorB[i];
-                magnitudeA += Math.pow(vectorA[i], 2);
-                magnitudeB += Math.pow(vectorB[i], 2);
+        List<PluginIdentifier> matchedIdentifier = new ArrayList<>();
+        for (Map.Entry<PluginIdentifier, String> entry : 
pluginInstanceMap.entrySet()) {
+            if (file.getName().startsWith(entry.getValue())) {
+                matchedIdentifier.add(entry.getKey());
             }
+        }
 
-            magnitudeA = Math.sqrt(magnitudeA);
-            magnitudeB = Math.sqrt(magnitudeB);
-
-            if (magnitudeA == 0 || magnitudeB == 0) {
-                return 0.0; // Avoid dividing by 0
-            } else {
-                return dotProduct / (magnitudeA * magnitudeB);
+        if (matchedIdentifier.size() == 1) {
+            try {
+                return Optional.of(file.toURI().toURL());
+            } catch (MalformedURLException e) {
+                log.warn("Cannot get plugin URL for pluginIdentifier: {}", 
file, e);
             }
         }
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "File found: {}, matches more than one PluginIdentifier: 
{}",
+                    file.getName(),
+                    matchedIdentifier);
+        }
+        return Optional.empty();
     }
 }
diff --git 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
index e8511d524b..023a508e02 100644
--- 
a/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
+++ 
b/seatunnel-plugin-discovery/src/test/java/org/apache/seatunnel/plugin/discovery/seatunnel/SeaTunnelSourcePluginDiscoveryTest.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.PluginIdentifier;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -53,7 +54,13 @@ class SeaTunnelSourcePluginDiscoveryTest {
                     Paths.get(seatunnelHome, "connectors", 
"connector-http.jar"),
                     Paths.get(seatunnelHome, "connectors", 
"connector-kafka.jar"),
                     Paths.get(seatunnelHome, "connectors", 
"connector-kafka-alcs.jar"),
-                    Paths.get(seatunnelHome, "connectors", 
"connector-kafka-blcs.jar"));
+                    Paths.get(seatunnelHome, "connectors", 
"connector-kafka-blcs.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"connector-jdbc-release-1.1.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"connector-jdbc-hive1.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"connector-odbc-baidu-v1.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"connector-odbc-baidu-release-1.1.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"seatunnel-transforms-v2.jar"),
+                    Paths.get(seatunnelHome, "connectors", 
"seatunnel-transforms-v1.jar"));
 
     @BeforeEach
     public void before() throws IOException {
@@ -75,7 +82,8 @@ class SeaTunnelSourcePluginDiscoveryTest {
                         PluginIdentifier.of("seatunnel", 
PluginType.SOURCE.getType(), "HttpJira"),
                         PluginIdentifier.of("seatunnel", 
PluginType.SOURCE.getType(), "HttpBase"),
                         PluginIdentifier.of("seatunnel", 
PluginType.SOURCE.getType(), "Kafka"),
-                        PluginIdentifier.of("seatunnel", 
PluginType.SINK.getType(), "Kafka-Blcs"));
+                        PluginIdentifier.of("seatunnel", 
PluginType.SINK.getType(), "Kafka-Blcs"),
+                        PluginIdentifier.of("seatunnel", 
PluginType.SINK.getType(), "Jdbc"));
         SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
                 new SeaTunnelSourcePluginDiscovery();
         Assertions.assertIterableEquals(
@@ -87,6 +95,57 @@ class SeaTunnelSourcePluginDiscoveryTest {
                                 Paths.get(seatunnelHome, "connectors", 
"connector-kafka.jar")
                                         .toString(),
                                 Paths.get(seatunnelHome, "connectors", 
"connector-kafka-blcs.jar")
+                                        .toString(),
+                                Paths.get(
+                                                seatunnelHome,
+                                                "connectors",
+                                                
"connector-jdbc-release-1.1.jar")
+                                        .toString())
+                        .collect(Collectors.toList()),
+                
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
+                        .map(URL::getPath)
+                        .collect(Collectors.toList()));
+    }
+
+    @Test
+    void getPluginBaseClassFailureScenario() {
+        List<PluginIdentifier> pluginIdentifiers =
+                Lists.newArrayList(
+                        PluginIdentifier.of("seatunnel", 
PluginType.SOURCE.getType(), "Odbc"));
+        SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+                new SeaTunnelSourcePluginDiscovery();
+        Exception exception =
+                Assertions.assertThrows(
+                        SeaTunnelException.class,
+                        () -> 
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers));
+        System.out.println(exception.getMessage());
+        Assertions.assertTrue(
+                exception
+                        .getMessage()
+                        .matches(
+                                "Cannot find unique plugin jar for 
pluginIdentifier: odbc -> connector-odbc. "
+                                        + "Possible impact jar: \\[.*.jar, 
.*.jar]"));
+    }
+
+    @Test
+    void getTransformClass() {
+        List<PluginIdentifier> pluginIdentifiers =
+                Lists.newArrayList(
+                        PluginIdentifier.of("seatunnel", 
PluginType.TRANSFORM.getType(), "Sql"),
+                        PluginIdentifier.of("seatunnel", 
PluginType.TRANSFORM.getType(), "Filter"));
+        SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery =
+                new SeaTunnelSourcePluginDiscovery();
+        Assertions.assertIterableEquals(
+                Stream.of(
+                                Paths.get(
+                                                seatunnelHome,
+                                                "connectors",
+                                                "seatunnel-transforms-v2.jar")
+                                        .toString(),
+                                Paths.get(
+                                                seatunnelHome,
+                                                "connectors",
+                                                "seatunnel-transforms-v1.jar")
                                         .toString())
                         .collect(Collectors.toList()),
                 
seaTunnelSourcePluginDiscovery.getPluginJarPaths(pluginIdentifiers).stream()
diff --git 
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
 
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
index ea20ad05b0..0f75af0460 100644
--- 
a/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
+++ 
b/seatunnel-plugin-discovery/src/test/resources/duplicate/connectors/plugin-mapping.properties
@@ -24,4 +24,18 @@ seatunnel.sink.Kafka = connector-kafka
 seatunnel.source.Kafka-Alcs = connector-kafka-alcs
 seatunnel.sink.Kafka-Alcs = connector-kafka-alcs
 seatunnel.source.Kafka-Blcs = connector-kafka-blcs
-seatunnel.sink.Kafka-Blcs = connector-kafka-blcs
\ No newline at end of file
+seatunnel.sink.Kafka-Blcs = connector-kafka-blcs
+seatunnel.source.Jdbc = connector-jdbc
+seatunnel.sink.Jdbc = connector-jdbc
+seatunnel.source.Hive1-Jdbc = connector-jdbc-hive1
+seatunnel.sink.Hive1-Jdbc = connector-jdbc-hive1
+seatunnel.source.Odbc = connector-odbc
+seatunnel.sink.Odbc = connector-odbc
+seatunnel.source.Baidu-Odbc = connector-odbc-baidu
+seatunnel.sink.Baidu-Odbc = connector-odbc-baidu
+
+seatunnel.transform.Sql = seatunnel-transforms-v2
+seatunnel.transform.FieldMapper = seatunnel-transforms-v2
+seatunnel.transform.Filter = seatunnel-transforms-v1
+seatunnel.transform.FilterRowKind = seatunnel-transforms-v1
+

Reply via email to