This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b088307612b KAFKA-15473: Hide duplicate plugins in /connector-plugins 
(#14398)
b088307612b is described below

commit b088307612b59d9864fb7e3096dc9a0b47d7273d
Author: Greg Harris <[email protected]>
AuthorDate: Tue Sep 19 09:37:21 2023 -0700

    KAFKA-15473: Hide duplicate plugins in /connector-plugins (#14398)
    
    Reviewers: Yash Mayya <[email protected]>, Sagar Rao 
<[email protected]>, Hector Geraldino <[email protected]>, Chris 
Egerton <[email protected]>
---
 .../rest/resources/ConnectorPluginsResource.java    |  8 +++++---
 .../resources/ConnectorPluginsResourceTest.java     | 21 ++++++++++++++++++---
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 947c467ae1a..037d98b68e6 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -42,8 +42,10 @@ import javax.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -55,12 +57,12 @@ public class ConnectorPluginsResource implements 
ConnectResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
-    private final List<PluginInfo> connectorPlugins;
+    private final Set<PluginInfo> connectorPlugins;
     private long requestTimeoutMs;
 
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
-        this.connectorPlugins = new ArrayList<>();
+        this.connectorPlugins = new LinkedHashSet<>();
         this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
 
         // TODO: improve once plugins are allowed to be added/removed during 
runtime.
@@ -126,7 +128,7 @@ public class ConnectorPluginsResource implements 
ConnectResource {
                         .filter(p -> 
PluginType.SINK.toString().equals(p.type()) || 
PluginType.SOURCE.toString().equals(p.type()))
                         .collect(Collectors.toList()));
             } else {
-                return Collections.unmodifiableList(connectorPlugins);
+                return Collections.unmodifiableList(new 
ArrayList<>(connectorPlugins));
             }
         }
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index c39017adc40..52ac14ca1cd 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -34,6 +34,8 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.SampleSinkConnector;
 import org.apache.kafka.connect.runtime.SampleSourceConnector;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -62,9 +64,11 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.ws.rs.BadRequestException;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -120,6 +124,7 @@ public class ConnectorPluginsResourceTest {
     static {
         try {
             ClassLoader classLoader = 
ConnectorPluginsResourceTest.class.getClassLoader();
+            ClassLoader pluginClassLoader = new 
PluginClassLoader(DelegatingClassLoaderTest.ARBITRARY_URL, new URL[]{}, 
classLoader);
             String appVersion = AppInfoParser.getVersion();
             SINK_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, 
classLoader));
             SINK_CONNECTOR_PLUGINS.add(new 
PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK, 
classLoader));
@@ -132,6 +137,10 @@ public class ConnectorPluginsResourceTest {
             CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, 
appVersion, PluginType.CONVERTER, classLoader));
             CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, 
appVersion, PluginType.CONVERTER, classLoader));
 
+            // Same class, version, and type, but loaded from a different 
classloader
+            CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, 
appVersion, PluginType.CONVERTER, pluginClassLoader));
+            CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, 
appVersion, PluginType.CONVERTER, pluginClassLoader));
+
             HEADER_CONVERTER_PLUGINS.add(new 
PluginDesc<>(StringConverter.class, appVersion, PluginType.HEADER_CONVERTER, 
classLoader));
             HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, 
appVersion, PluginType.HEADER_CONVERTER, classLoader));
 
@@ -386,7 +395,7 @@ public class ConnectorPluginsResourceTest {
 
     @Test
     public void testListAllPlugins() {
-        Set<PluginInfo> expectedConnectorPlugins = Stream.of(
+        List<PluginInfo> expectedConnectorPlugins = Stream.of(
                         SINK_CONNECTOR_PLUGINS,
                         SOURCE_CONNECTOR_PLUGINS,
                         CONVERTER_PLUGINS,
@@ -395,8 +404,14 @@ public class ConnectorPluginsResourceTest {
                         PREDICATE_PLUGINS
                 ).flatMap(Collection::stream)
                 .map(PluginInfo::new)
-                .collect(Collectors.toSet());
-        Set<PluginInfo> actualConnectorPlugins = new 
HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
+                .distinct()
+                .collect(Collectors.toList());
+        List<PluginInfo> actualConnectorPlugins = new 
ArrayList<>(connectorPluginsResource.listConnectorPlugins(false));
+        Comparator<PluginInfo> compare = 
Comparator.comparing(PluginInfo::className)
+                .thenComparing(PluginInfo::type)
+                .thenComparing(PluginInfo::version);
+        actualConnectorPlugins.sort(compare);
+        expectedConnectorPlugins.sort(compare);
         assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
         verify(herder, atLeastOnce()).plugins();
     }

Reply via email to