This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b088307612b KAFKA-15473: Hide duplicate plugins in /connector-plugins
(#14398)
b088307612b is described below
commit b088307612b59d9864fb7e3096dc9a0b47d7273d
Author: Greg Harris <[email protected]>
AuthorDate: Tue Sep 19 09:37:21 2023 -0700
KAFKA-15473: Hide duplicate plugins in /connector-plugins (#14398)
Reviewers: Yash Mayya <[email protected]>, Sagar Rao
<[email protected]>, Hector Geraldino <[email protected]>, Chris
Egerton <[email protected]>
---
.../rest/resources/ConnectorPluginsResource.java | 8 +++++---
.../resources/ConnectorPluginsResourceTest.java | 21 ++++++++++++++++++---
2 files changed, 23 insertions(+), 6 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 947c467ae1a..037d98b68e6 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -42,8 +42,10 @@ import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -55,12 +57,12 @@ public class ConnectorPluginsResource implements
ConnectResource {
private static final String ALIAS_SUFFIX = "Connector";
private final Herder herder;
- private final List<PluginInfo> connectorPlugins;
+ private final Set<PluginInfo> connectorPlugins;
private long requestTimeoutMs;
public ConnectorPluginsResource(Herder herder) {
this.herder = herder;
- this.connectorPlugins = new ArrayList<>();
+ this.connectorPlugins = new LinkedHashSet<>();
this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
// TODO: improve once plugins are allowed to be added/removed during
runtime.
@@ -126,7 +128,7 @@ public class ConnectorPluginsResource implements
ConnectResource {
.filter(p ->
PluginType.SINK.toString().equals(p.type()) ||
PluginType.SOURCE.toString().equals(p.type()))
.collect(Collectors.toList()));
} else {
- return Collections.unmodifiableList(connectorPlugins);
+ return Collections.unmodifiableList(new
ArrayList<>(connectorPlugins));
}
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index c39017adc40..52ac14ca1cd 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -34,6 +34,8 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.SampleSinkConnector;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoaderTest;
+import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -62,9 +64,11 @@ import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnitRunner;
import javax.ws.rs.BadRequestException;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -120,6 +124,7 @@ public class ConnectorPluginsResourceTest {
static {
try {
ClassLoader classLoader =
ConnectorPluginsResourceTest.class.getClassLoader();
+ ClassLoader pluginClassLoader = new
PluginClassLoader(DelegatingClassLoaderTest.ARBITRARY_URL, new URL[]{},
classLoader);
String appVersion = AppInfoParser.getVersion();
SINK_CONNECTOR_PLUGINS.add(new
PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK,
classLoader));
SINK_CONNECTOR_PLUGINS.add(new
PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK,
classLoader));
@@ -132,6 +137,10 @@ public class ConnectorPluginsResourceTest {
CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class,
appVersion, PluginType.CONVERTER, classLoader));
CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class,
appVersion, PluginType.CONVERTER, classLoader));
+ // Same class, version, and type, but loaded from a different
classloader
+ CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class,
appVersion, PluginType.CONVERTER, pluginClassLoader));
+ CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class,
appVersion, PluginType.CONVERTER, pluginClassLoader));
+
HEADER_CONVERTER_PLUGINS.add(new
PluginDesc<>(StringConverter.class, appVersion, PluginType.HEADER_CONVERTER,
classLoader));
HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class,
appVersion, PluginType.HEADER_CONVERTER, classLoader));
@@ -386,7 +395,7 @@ public class ConnectorPluginsResourceTest {
@Test
public void testListAllPlugins() {
- Set<PluginInfo> expectedConnectorPlugins = Stream.of(
+ List<PluginInfo> expectedConnectorPlugins = Stream.of(
SINK_CONNECTOR_PLUGINS,
SOURCE_CONNECTOR_PLUGINS,
CONVERTER_PLUGINS,
@@ -395,8 +404,14 @@ public class ConnectorPluginsResourceTest {
PREDICATE_PLUGINS
).flatMap(Collection::stream)
.map(PluginInfo::new)
- .collect(Collectors.toSet());
- Set<PluginInfo> actualConnectorPlugins = new
HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
+ .distinct()
+ .collect(Collectors.toList());
+ List<PluginInfo> actualConnectorPlugins = new
ArrayList<>(connectorPluginsResource.listConnectorPlugins(false));
+ Comparator<PluginInfo> compare =
Comparator.comparing(PluginInfo::className)
+ .thenComparing(PluginInfo::type)
+ .thenComparing(PluginInfo::version);
+ actualConnectorPlugins.sort(compare);
+ expectedConnectorPlugins.sort(compare);
assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
verify(herder, atLeastOnce()).plugins();
}