This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new fd0d43f3c42 KAFKA-16093: Fix spurious REST-related warnings on Connect
startup (#15149)
fd0d43f3c42 is described below
commit fd0d43f3c422b9c3782e7f6c04d59e88e4dcee45
Author: Chris Egerton <[email protected]>
AuthorDate: Wed Jan 10 09:03:23 2024 -0500
KAFKA-16093: Fix spurious REST-related warnings on Connect startup (#15149)
Reviewers: Sagar Rao <[email protected]>, Greg Harris
<[email protected]>
---
checkstyle/import-control.xml | 5 +
.../connect/mirror/rest/MirrorRestServer.java | 23 ++++-
.../rest/resources/InternalMirrorResource.java | 11 +-
.../org/apache/kafka/connect/runtime/Worker.java | 10 +-
.../connect/runtime/rest/ConnectRestServer.java | 26 +++--
.../connect/runtime/rest/HerderRequestHandler.java | 16 +--
...onnectResource.java => RestRequestTimeout.java} | 25 ++---
.../kafka/connect/runtime/rest/RestServer.java | 111 ++++++++++++++-------
.../runtime/rest/resources/ConnectResource.java | 40 --------
.../rest/resources/ConnectorPluginsResource.java | 19 ++--
.../runtime/rest/resources/ConnectorsResource.java | 21 ++--
.../rest/resources/InternalClusterResource.java | 12 +--
.../rest/resources/InternalConnectResource.java | 7 +-
.../runtime/rest/resources/LoggingResource.java | 8 +-
.../runtime/rest/resources/RootResource.java | 10 +-
.../connect/integration/BlockingConnectorTest.java | 4 +-
.../apache/kafka/connect/runtime/WorkerTest.java | 8 +-
.../runtime/rest/ConnectRestServerTest.java | 35 ++++---
.../resources/ConnectorPluginsResourceTest.java | 5 +-
.../rest/resources/ConnectorsResourceTest.java | 16 +--
.../resources/InternalConnectResourceTest.java | 3 +-
21 files changed, 211 insertions(+), 204 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 0843deb1a30..3c3e46f49a3 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -469,6 +469,9 @@
<allow pkg="kafka.server" />
<subpackage name="rest">
<allow pkg="javax.ws.rs" />
+ <allow pkg="javax.inject" />
+ <allow pkg="org.glassfish.jersey" />
+ <allow pkg="org.glassfish.hk2" />
</subpackage>
</subpackage>
@@ -482,6 +485,8 @@
<subpackage name="rest">
<allow pkg="org.eclipse.jetty" />
<allow pkg="javax.ws.rs" />
+ <allow pkg="javax.inject" />
+ <allow pkg="org.glassfish.hk2" />
<allow pkg="javax.servlet" />
<allow pkg="org.glassfish.jersey" />
<allow pkg="com.fasterxml.jackson" />
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
index 7f1fe2841a3..a5abeff40ce 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java
@@ -22,7 +22,9 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
+import org.glassfish.hk2.api.TypeLiteral;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
import java.util.Arrays;
import java.util.Collection;
@@ -45,15 +47,28 @@ public class MirrorRestServer extends RestServer {
}
@Override
- protected Collection<ConnectResource> regularResources() {
+ protected Collection<Class<?>> regularResources() {
return Arrays.asList(
- new InternalMirrorResource(herders, restClient)
+ InternalMirrorResource.class
);
}
@Override
- protected Collection<ConnectResource> adminResources() {
+ protected Collection<Class<?>> adminResources() {
return Collections.emptyList();
}
+ @Override
+ protected void configureRegularResources(ResourceConfig resourceConfig) {
+ resourceConfig.register(new Binder());
+ }
+
+ private class Binder extends AbstractBinder {
+ @Override
+ protected void configure() {
+ bind(herders).to(new TypeLiteral<Map<SourceAndTarget, Herder>>() {
});
+ bind(restClient).to(RestClient.class);
+ }
+ }
+
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
index 8b5150f56ac..5c46bd9c6c5 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java
@@ -19,10 +19,12 @@ package org.apache.kafka.connect.mirror.rest.resources;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Inject;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
@@ -39,8 +41,13 @@ public class InternalMirrorResource extends
InternalClusterResource {
private final Map<SourceAndTarget, Herder> herders;
- public InternalMirrorResource(Map<SourceAndTarget, Herder> herders,
RestClient restClient) {
- super(restClient);
+ @Inject
+ public InternalMirrorResource(
+ Map<SourceAndTarget, Herder> herders,
+ RestClient restClient,
+ RestRequestTimeout requestTimeout
+ ) {
+ super(restClient, requestTimeout);
this.herders = herders;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 2f2aa762038..d9bfaecc3bf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -66,10 +66,10 @@ import
org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.Message;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -717,7 +717,7 @@ public class Worker {
.map(this::taskTransactionalId)
.collect(Collectors.toList());
FenceProducersOptions fencingOptions = new
FenceProducersOptions()
- .timeoutMs((int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+ .timeoutMs((int)
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
return admin.fenceProducers(transactionalIds,
fencingOptions).all().whenComplete((ignored, error) -> {
if (error == null)
log.debug("Finished fencing out {} task producers
for source connector {}", numTasks, connName);
@@ -1194,7 +1194,7 @@ public class Worker {
Admin admin = adminFactory.apply(adminConfig);
try {
ListConsumerGroupOffsetsOptions listOffsetsOptions = new
ListConsumerGroupOffsetsOptions()
- .timeoutMs((int)
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+ .timeoutMs((int)
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult =
admin.listConsumerGroupOffsets(groupId, listOffsetsOptions);
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((result,
error) -> {
if (error != null) {
@@ -1298,7 +1298,7 @@ public class Worker {
Map<Map<String, ?>, Map<String, ?>>
offsets, ClassLoader connectorLoader, Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try {
- Timer timer =
time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
+ Timer timer =
time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
boolean isReset = offsets == null;
SinkConnectorConfig sinkConnectorConfig = new
SinkConnectorConfig(plugins, connectorConfig);
Class<? extends Connector> sinkConnectorClass =
connector.getClass();
@@ -1529,7 +1529,7 @@ public class Worker {
ClassLoader connectorLoader,
Callback<Message> cb) {
executor.submit(plugins.withClassLoader(connectorLoader, () -> {
try {
- Timer timer =
time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS));
+ Timer timer =
time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS));
// This reads to the end of the offsets topic and can be a
potentially time-consuming operation
offsetStore.start();
updateTimerAndCheckExpiry(timer, "Timed out while trying to
read to the end of the offsets topic prior to modifying " +
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
index ca9eb731c0c..d7bb40e1dcd 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
@@ -17,12 +17,12 @@
package org.apache.kafka.connect.runtime.rest;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import
org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource;
import org.apache.kafka.connect.runtime.rest.resources.LoggingResource;
import org.apache.kafka.connect.runtime.rest.resources.RootResource;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
import java.util.Arrays;
@@ -45,25 +45,35 @@ public class ConnectRestServer extends RestServer {
}
@Override
- protected Collection<ConnectResource> regularResources() {
+ protected Collection<Class<?>> regularResources() {
return Arrays.asList(
- new RootResource(herder),
- new ConnectorsResource(herder, config, restClient),
- new InternalConnectResource(herder, restClient),
- new ConnectorPluginsResource(herder)
+ RootResource.class,
+ ConnectorsResource.class,
+ InternalConnectResource.class,
+ ConnectorPluginsResource.class
);
}
@Override
- protected Collection<ConnectResource> adminResources() {
+ protected Collection<Class<?>> adminResources() {
return Arrays.asList(
- new LoggingResource()
+ LoggingResource.class
);
}
@Override
protected void configureRegularResources(ResourceConfig resourceConfig) {
registerRestExtensions(herder, resourceConfig);
+ resourceConfig.register(new Binder());
+ }
+
+ private class Binder extends AbstractBinder {
+ @Override
+ protected void configure() {
+ bind(herder).to(Herder.class);
+ bind(restClient).to(RestClient.class);
+ bind(config).to(RestServerConfig.class);
+ }
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
index 4dfd093bc2e..9af285c10fb 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java
@@ -37,19 +37,11 @@ public class HerderRequestHandler {
private static final Logger log =
LoggerFactory.getLogger(HerderRequestHandler.class);
private final RestClient restClient;
+ private final RestRequestTimeout requestTimeout;
- private long requestTimeoutMs;
-
- public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) {
+ public HerderRequestHandler(RestClient restClient, RestRequestTimeout
requestTimeout) {
this.restClient = restClient;
- this.requestTimeoutMs = requestTimeoutMs;
- }
-
- public void requestTimeoutMs(long requestTimeoutMs) {
- if (requestTimeoutMs < 1) {
- throw new IllegalArgumentException("REST request timeout must be
positive");
- }
- this.requestTimeoutMs = requestTimeoutMs;
+ this.requestTimeout = requestTimeout;
}
/**
@@ -61,7 +53,7 @@ public class HerderRequestHandler {
*/
public <T> T completeRequest(FutureCallback<T> cb) throws Throwable {
try {
- return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS);
+ return cb.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw e.getCause();
} catch (TimeoutException e) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java
similarity index 59%
copy from
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
copy to
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java
index c4987150dfb..d2ce28cc472 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java
@@ -14,26 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.connect.runtime.rest.resources;
+package org.apache.kafka.connect.runtime.rest;
-import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.rest.RestClient;
+public interface RestRequestTimeout {
-import javax.ws.rs.Path;
-
-@Path("/connectors")
-public class InternalConnectResource extends InternalClusterResource {
-
- private final Herder herder;
-
- public InternalConnectResource(Herder herder, RestClient restClient) {
- super(restClient);
- this.herder = herder;
- }
-
- @Override
- protected Herder herderForRequest() {
- return herder;
- }
+ /**
+ * @return the current timeout that should be used for REST requests, in
milliseconds
+ */
+ long timeoutMs();
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 8cd3583fbd7..6a7d327f43b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl;
import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.rest.util.SSLUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.CustomRequestLog;
@@ -43,6 +42,8 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.eclipse.jetty.servlets.HeaderFilter;
import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.glassfish.hk2.utilities.Binder;
+import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.servlet.ServletContainer;
@@ -59,6 +60,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -66,6 +68,13 @@ import java.util.regex.Pattern;
* Embedded server for the REST API that provides the control plane for Kafka
Connect workers.
*/
public abstract class RestServer {
+
+ // TODO: This should not be so long. However, due to potentially long
rebalances that may have to wait a full
+ // session timeout to complete, during which we cannot serve some
requests. Ideally we could reduce this, but
+ // we need to consider all possible scenarios this could fail. It might be
ok to fail with a timeout in rare cases,
+ // but currently a worker simply leaving the group can take this long as
well.
+ public static final long DEFAULT_REST_REQUEST_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(90);
+
private static final Logger log =
LoggerFactory.getLogger(RestServer.class);
// Used to distinguish between Admin connectors and regular REST API
connectors when binding admin handlers
@@ -80,8 +89,8 @@ public abstract class RestServer {
protected final RestServerConfig config;
private final ContextHandlerCollection handlers;
private final Server jettyServer;
+ private final RequestTimeout requestTimeout;
- private Collection<ConnectResource> resources;
private List<ConnectRestExtension> connectRestExtensions =
Collections.emptyList();
/**
@@ -95,6 +104,7 @@ public abstract class RestServer {
jettyServer = new Server();
handlers = new ContextHandlerCollection();
+ requestTimeout = new RequestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
createConnectors(listeners, adminListeners);
}
@@ -207,44 +217,31 @@ public abstract class RestServer {
protected final void initializeResources() {
log.info("Initializing REST resources");
- resources = new ArrayList<>();
-
- ResourceConfig resourceConfig = new ResourceConfig();
- resourceConfig.register(new JacksonJsonProvider());
- Collection<ConnectResource> regularResources = regularResources();
+ ResourceConfig resourceConfig = newResourceConfig();
+ Collection<Class<?>> regularResources = regularResources();
regularResources.forEach(resourceConfig::register);
- resources.addAll(regularResources);
-
- resourceConfig.register(ConnectExceptionMapper.class);
- resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
-
configureRegularResources(resourceConfig);
List<String> adminListeners = config.adminListeners();
ResourceConfig adminResourceConfig;
- if (adminListeners == null) {
- log.info("Adding admin resources to main listener");
- adminResourceConfig = resourceConfig;
- Collection<ConnectResource> adminResources = adminResources();
- resources.addAll(adminResources);
- adminResources.forEach(adminResourceConfig::register);
- configureAdminResources(adminResourceConfig);
- } else if (adminListeners.size() > 0) {
- // TODO: we need to check if these listeners are same as
'listeners'
- // TODO: the following code assumes that they are different
- log.info("Adding admin resources to admin listener");
- adminResourceConfig = new ResourceConfig();
- adminResourceConfig.register(new JacksonJsonProvider());
- Collection<ConnectResource> adminResources = adminResources();
- resources.addAll(adminResources);
- adminResources.forEach(adminResourceConfig::register);
- adminResourceConfig.register(ConnectExceptionMapper.class);
- configureAdminResources(adminResourceConfig);
- } else {
+ if (adminListeners != null && adminListeners.isEmpty()) {
log.info("Skipping adding admin resources");
// set up adminResource but add no handlers to it
adminResourceConfig = resourceConfig;
+ } else {
+ if (adminListeners == null) {
+ log.info("Adding admin resources to main listener");
+ adminResourceConfig = resourceConfig;
+ } else {
+ // TODO: we need to check if these listeners are same as
'listeners'
+ // TODO: the following code assumes that they are different
+ log.info("Adding admin resources to admin listener");
+ adminResourceConfig = newResourceConfig();
+ }
+ Collection<Class<?>> adminResources = adminResources();
+ adminResources.forEach(adminResourceConfig::register);
+ configureAdminResources(adminResourceConfig);
}
ServletContainer servletContainer = new
ServletContainer(resourceConfig);
@@ -302,17 +299,26 @@ public abstract class RestServer {
log.info("REST resources initialized; server is started and ready to
handle requests");
}
+ private ResourceConfig newResourceConfig() {
+ ResourceConfig result = new ResourceConfig();
+ result.register(new JacksonJsonProvider());
+ result.register(requestTimeout.binder());
+ result.register(ConnectExceptionMapper.class);
+ result.property(ServerProperties.WADL_FEATURE_DISABLE, true);
+ return result;
+ }
+
/**
- * @return the {@link ConnectResource resources} that should be registered
with the
+ * @return the resources that should be registered with the
* standard (i.e., non-admin) listener for this server; may be empty, but
not null
*/
- protected abstract Collection<ConnectResource> regularResources();
+ protected abstract Collection<Class<?>> regularResources();
/**
- * @return the {@link ConnectResource resources} that should be registered
with the
+ * @return the resources that should be registered with the
* admin listener for this server; may be empty, but not null
*/
- protected abstract Collection<ConnectResource> adminResources();
+ protected abstract Collection<Class<?>> adminResources();
/**
* Pluggable hook to customize the regular (i.e., non-admin) resources on
this server
@@ -426,7 +432,7 @@ public abstract class RestServer {
// For testing only
public void requestTimeout(long requestTimeoutMs) {
- this.resources.forEach(resource ->
resource.requestTimeout(requestTimeoutMs));
+ this.requestTimeout.timeoutMs(requestTimeoutMs);
}
String determineAdvertisedProtocol() {
@@ -476,7 +482,7 @@ public abstract class RestServer {
config.restExtensions(),
config, ConnectRestExtension.class);
- long herderRequestTimeoutMs =
ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS;
+ long herderRequestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
Integer rebalanceTimeoutMs = config.rebalanceTimeoutMs();
@@ -508,4 +514,35 @@ public abstract class RestServer {
headerFilterHolder.setInitParameter("headerConfig", headerConfig);
context.addFilter(headerFilterHolder, "/*",
EnumSet.of(DispatcherType.REQUEST));
}
+
+ private static class RequestTimeout implements RestRequestTimeout {
+
+ private final RequestBinder binder;
+ private volatile long timeoutMs;
+
+ public RequestTimeout(long initialTimeoutMs) {
+ this.timeoutMs = initialTimeoutMs;
+ this.binder = new RequestBinder();
+ }
+
+ @Override
+ public long timeoutMs() {
+ return timeoutMs;
+ }
+
+ public void timeoutMs(long timeoutMs) {
+ this.timeoutMs = timeoutMs;
+ }
+
+ public Binder binder() {
+ return binder;
+ }
+
+ private class RequestBinder extends AbstractBinder {
+ @Override
+ protected void configure() {
+ bind(RequestTimeout.this).to(RestRequestTimeout.class);
+ }
+ }
+ }
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
deleted file mode 100644
index 49d61a727a9..00000000000
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime.rest.resources;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * This interface defines shared logic for all Connect REST resources.
- */
-public interface ConnectResource {
-
- // TODO: This should not be so long. However, due to potentially long
rebalances that may have to wait a full
- // session timeout to complete, during which we cannot serve some
requests. Ideally we could reduce this, but
- // we need to consider all possible scenarios this could fail. It might be
ok to fail with a timeout in rare cases,
- // but currently a worker simply leaving the group can take this long as
well.
- long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90);
-
- /**
- * Set how long the resource will await the completion of each request
before returning a 500 error.
- * If the resource does not perform any operations that can be expected to
block under reasonable
- * circumstances, this can be implemented as a no-op.
- * @param requestTimeoutMs the new timeout in milliseconds; must be
positive
- */
- void requestTimeout(long requestTimeoutMs);
-
-}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 037d98b68e6..cb265717bb4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -22,12 +22,14 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.PluginInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.FutureCallback;
+import javax.inject.Inject;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
@@ -53,17 +55,18 @@ import java.util.stream.Collectors;
@Path("/connector-plugins")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class ConnectorPluginsResource implements ConnectResource {
+public class ConnectorPluginsResource {
private static final String ALIAS_SUFFIX = "Connector";
private final Herder herder;
private final Set<PluginInfo> connectorPlugins;
- private long requestTimeoutMs;
+ private final RestRequestTimeout requestTimeout;
- public ConnectorPluginsResource(Herder herder) {
+ @Inject
+ public ConnectorPluginsResource(Herder herder, RestRequestTimeout
requestTimeout) {
this.herder = herder;
+ this.requestTimeout = requestTimeout;
this.connectorPlugins = new LinkedHashSet<>();
- this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS;
// TODO: improve once plugins are allowed to be added/removed during
runtime.
addConnectorPlugins(herder.plugins().sinkConnectors());
@@ -80,11 +83,6 @@ public class ConnectorPluginsResource implements
ConnectResource {
.forEach(connectorPlugins::add);
}
- @Override
- public void requestTimeout(long requestTimeoutMs) {
- this.requestTimeoutMs = requestTimeoutMs;
- }
-
@PUT
@Path("/{pluginName}/config/validate")
@Operation(summary = "Validate the provided configuration against the
configuration definition for the specified pluginName")
@@ -106,7 +104,7 @@ public class ConnectorPluginsResource implements
ConnectResource {
herder.validateConnectorConfig(connectorConfig, validationCallback,
false);
try {
- return validationCallback.get(requestTimeoutMs,
TimeUnit.MILLISECONDS);
+ return validationCallback.get(requestTimeout.timeoutMs(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// This timeout is for the operation itself. None of the timeout
error codes are relevant, so internal server
// error is the best option
@@ -117,7 +115,6 @@ public class ConnectorPluginsResource implements
ConnectResource {
}
@GET
- @Path("/")
@Operation(summary = "List all connector plugins installed")
public List<PluginInfo> listConnectorPlugins(
@DefaultValue("true") @QueryParam("connectorsOnly")
@Parameter(description = "Whether to list only connectors instead of all
plugins") boolean connectorsOnly
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 646fb039256..dc911106a1a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -39,6 +40,7 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Inject;
import javax.servlet.ServletContext;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
@@ -70,7 +72,7 @@ import static
org.apache.kafka.connect.runtime.rest.HerderRequestHandler.Transla
@Path("/connectors")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class ConnectorsResource implements ConnectResource {
+public class ConnectorsResource {
private static final Logger log =
LoggerFactory.getLogger(ConnectorsResource.class);
private final Herder herder;
@@ -80,20 +82,20 @@ public class ConnectorsResource implements ConnectResource {
private final boolean isTopicTrackingDisabled;
private final boolean isTopicTrackingResetDisabled;
- public ConnectorsResource(Herder herder, RestServerConfig config,
RestClient restClient) {
+ @Inject
+ public ConnectorsResource(
+ Herder herder,
+ RestServerConfig config,
+ RestClient restClient,
+ RestRequestTimeout requestTimeout
+ ) {
this.herder = herder;
- this.requestHandler = new HerderRequestHandler(restClient,
DEFAULT_REST_REQUEST_TIMEOUT_MS);
+ this.requestHandler = new HerderRequestHandler(restClient,
requestTimeout);
this.isTopicTrackingDisabled = !config.topicTrackingEnabled();
this.isTopicTrackingResetDisabled =
!config.topicTrackingResetEnabled();
}
- @Override
- public void requestTimeout(long requestTimeoutMs) {
- requestHandler.requestTimeoutMs(requestTimeoutMs);
- }
-
@GET
- @Path("/")
@Operation(summary = "List all active connectors")
public Response listConnectors(
final @Context UriInfo uriInfo,
@@ -131,7 +133,6 @@ public class ConnectorsResource implements ConnectResource {
}
@POST
- @Path("/")
@Operation(summary = "Create a new connector")
public Response createConnector(final @Parameter(hidden = true)
@QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
index c7bef991b41..5ee1d232a29 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.distributed.Crypto;
import org.apache.kafka.connect.runtime.rest.HerderRequestHandler;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
import org.apache.kafka.connect.util.FutureCallback;
import javax.ws.rs.POST;
@@ -45,7 +46,7 @@ import java.util.Map;
* requests that originate from a user and are forwarded from one worker to
another.
*/
@Produces(MediaType.APPLICATION_JSON)
-public abstract class InternalClusterResource implements ConnectResource {
+public abstract class InternalClusterResource {
private static final TypeReference<List<Map<String, String>>>
TASK_CONFIGS_TYPE =
new TypeReference<List<Map<String, String>>>() { };
@@ -56,13 +57,8 @@ public abstract class InternalClusterResource implements
ConnectResource {
@Context
UriInfo uriInfo;
- protected InternalClusterResource(RestClient restClient) {
- this.requestHandler = new HerderRequestHandler(restClient,
DEFAULT_REST_REQUEST_TIMEOUT_MS);
- }
-
- @Override
- public void requestTimeout(long requestTimeoutMs) {
- requestHandler.requestTimeoutMs(requestTimeoutMs);
+ protected InternalClusterResource(RestClient restClient,
RestRequestTimeout requestTimeout) {
+ this.requestHandler = new HerderRequestHandler(restClient,
requestTimeout);
}
/**
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
index c4987150dfb..228c7cd67ba 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java
@@ -18,7 +18,9 @@ package org.apache.kafka.connect.runtime.rest.resources;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
+import javax.inject.Inject;
import javax.ws.rs.Path;
@Path("/connectors")
@@ -26,8 +28,9 @@ public class InternalConnectResource extends
InternalClusterResource {
private final Herder herder;
- public InternalConnectResource(Herder herder, RestClient restClient) {
- super(restClient);
+ @Inject
+ public InternalConnectResource(Herder herder, RestClient restClient,
RestRequestTimeout requestTimeout) {
+ super(restClient, requestTimeout);
this.herder = herder;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
index 812cf696563..36ac2538733 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
@@ -46,25 +46,19 @@ import java.util.TreeMap;
@Path("/admin/loggers")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class LoggingResource implements ConnectResource {
+public class LoggingResource {
/**
* Log4j uses "root" (case-insensitive) as name of the root logger.
*/
private static final String ROOT_LOGGER_NAME = "root";
- @Override
- public void requestTimeout(long requestTimeoutMs) {
- // No-op
- }
-
/**
* List the current loggers that have their levels explicitly set and
their log levels.
*
* @return a list of current loggers and their levels.
*/
@GET
- @Path("/")
@Operation(summary = "List the current loggers that have their levels
explicitly set and their log levels")
public Response listLoggers() {
Map<String, Map<String, String>> loggers = new TreeMap<>();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index fe09e269039..b112c59b820 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -20,6 +20,7 @@ import io.swagger.v3.oas.annotations.Operation;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
+import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@@ -27,21 +28,16 @@ import javax.ws.rs.core.MediaType;
@Path("/")
@Produces(MediaType.APPLICATION_JSON)
-public class RootResource implements ConnectResource {
+public class RootResource {
private final Herder herder;
+ @Inject
public RootResource(Herder herder) {
this.herder = herder;
}
- @Override
- public void requestTimeout(long requestTimeoutMs) {
- // No-op
- }
-
@GET
- @Path("/")
@Operation(summary = "Get details about this Connect worker and the id of
the Kafka cluster it is connected to")
public ServerInfo serverInfo() {
return new ServerInfo(herder.kafkaClusterId());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 33614e317c4..934a3177a24 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@@ -63,6 +62,7 @@ import java.util.stream.IntStream;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static
org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -346,7 +346,7 @@ public class BlockingConnectorTest {
exception.getMessage().contains("Request timed out")
);
// Reset the REST request timeout so that other requests aren't
impacted
-
connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS);
+ connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
}
private static class Block {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 0de444be202..f88729af868 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -60,10 +60,10 @@ import
org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.Message;
-import org.apache.kafka.connect.runtime.rest.resources.ConnectResource;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
@@ -2364,7 +2364,7 @@ public class WorkerTest {
// Expect the call to Admin::deleteConsumerGroups to have a timeout
value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS
// minus the delay introduced in the call to
Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call
to
// SinkConnector::alterOffsets (3000 ms)
- assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS -
2000L - 3000L,
+ assertEquals((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L
- 3000L,
deleteConsumerGroupsOptionsArgumentCaptor.getValue().timeoutMs().intValue());
verify(admin, timeout(1000)).close();
verifyKafkaClusterId();
@@ -2416,7 +2416,7 @@ public class WorkerTest {
when(plugins.withClassLoader(any(ClassLoader.class),
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sourceConnector.alterOffsets(eq(connectorProps),
anyMap())).thenAnswer(invocation -> {
- time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
+ time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
return true;
});
ConnectorOffsetBackingStore offsetStore =
mock(ConnectorOffsetBackingStore.class);
@@ -2454,7 +2454,7 @@ public class WorkerTest {
when(plugins.withClassLoader(any(ClassLoader.class),
any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg());
when(sinkConnector.alterOffsets(eq(connectorProps),
anyMap())).thenAnswer(invocation -> {
- time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
+ time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000);
return true;
});
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
index 2b78900bc25..3eff0d335cb 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java
@@ -39,6 +39,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.LoggerFactory;
@@ -62,13 +63,13 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ConnectRestServerTest {
- private Herder herder;
- private Plugins plugins;
+ @Mock private RestClient restClient;
+ @Mock private Herder herder;
+ @Mock private Plugins plugins;
private ConnectRestServer server;
private CloseableHttpClient httpClient;
private Collection<CloseableHttpResponse> responses = new ArrayList<>();
@@ -77,8 +78,6 @@ public class ConnectRestServerTest {
@Before
public void setUp() {
- herder = mock(Herder.class);
- plugins = mock(Plugins.class);
httpClient = HttpClients.createMinimal();
}
@@ -117,7 +116,7 @@ public class ConnectRestServerTest {
Map<String, String> configMap = new HashMap<>(baseServerProps());
configMap.put(RestServerConfig.LISTENERS_CONFIG,
"http://localhost:8080,https://localhost:8443");
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("http://localhost:8080/",
server.advertisedUrl().toString());
server.stop();
@@ -126,7 +125,7 @@ public class ConnectRestServerTest {
configMap.put(RestServerConfig.LISTENERS_CONFIG,
"http://localhost:8080,https://localhost:8443");
configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG,
"https");
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("https://localhost:8443/",
server.advertisedUrl().toString());
server.stop();
@@ -134,7 +133,7 @@ public class ConnectRestServerTest {
configMap = new HashMap<>(baseServerProps());
configMap.put(RestServerConfig.LISTENERS_CONFIG,
"https://localhost:8443");
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("https://localhost:8443/",
server.advertisedUrl().toString());
server.stop();
@@ -145,7 +144,7 @@ public class ConnectRestServerTest {
configMap.put(RestServerConfig.REST_ADVERTISED_HOST_NAME_CONFIG,
"somehost");
configMap.put(RestServerConfig.REST_ADVERTISED_PORT_CONFIG, "10000");
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("http://somehost:10000/",
server.advertisedUrl().toString());
server.stop();
@@ -154,7 +153,7 @@ public class ConnectRestServerTest {
configMap.put(RestServerConfig.LISTENERS_CONFIG,
"https://encrypted-localhost:42069,http://plaintext-localhost:4761");
configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG,
"http");
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
Assert.assertEquals("http://plaintext-localhost:4761/",
server.advertisedUrl().toString());
server.stop();
}
@@ -167,7 +166,7 @@ public class ConnectRestServerTest {
doReturn(plugins).when(herder).plugins();
expectEmptyRestExtensions();
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -194,7 +193,7 @@ public class ConnectRestServerTest {
expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
URI serverUrl = server.advertisedUrl();
@@ -237,7 +236,7 @@ public class ConnectRestServerTest {
expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
@@ -257,7 +256,7 @@ public class ConnectRestServerTest {
// create some loggers in the process
LoggerFactory.getLogger("a.b.c.s.W");
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -290,7 +289,7 @@ public class ConnectRestServerTest {
LoggerFactory.getLogger("a.b.c.p.Y");
LoggerFactory.getLogger("a.b.c.p.Z");
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -312,7 +311,7 @@ public class ConnectRestServerTest {
doReturn(plugins).when(herder).plugins();
expectEmptyRestExtensions();
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -331,7 +330,7 @@ public class ConnectRestServerTest {
doReturn(plugins).when(herder).plugins();
expectEmptyRestExtensions();
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
@@ -377,7 +376,7 @@ public class ConnectRestServerTest {
expectEmptyRestExtensions();
doReturn(Arrays.asList("a", "b")).when(herder).connectors();
- server = new ConnectRestServer(null, null, configMap);
+ server = new ConnectRestServer(null, restClient, configMap);
server.initializeServer();
server.initializeResources(herder);
HttpRequest request = new HttpGet("/connectors");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 52ac14ca1cd..589dfb29bd0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -39,6 +39,8 @@ import
org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
+import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
@@ -208,7 +210,8 @@ public class ConnectorPluginsResourceTest {
doReturn(HEADER_CONVERTER_PLUGINS).when(plugins).headerConverters();
doReturn(TRANSFORMATION_PLUGINS).when(plugins).transformations();
doReturn(PREDICATE_PLUGINS).when(plugins).predicates();
- connectorPluginsResource = new ConnectorPluginsResource(herder);
+ RestRequestTimeout requestTimeout = () ->
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
+ connectorPluginsResource = new ConnectorPluginsResource(herder,
requestTimeout);
}
@Test
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 0183e251600..14010b3120a 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -27,6 +27,8 @@ import
org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestRequestTimeout;
+import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.RestServerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -145,6 +147,8 @@ public class ConnectorsResourceTest {
private static final Set<String> CONNECTOR2_ACTIVE_TOPICS = new HashSet<>(
Arrays.asList("foo_topic", "baz_topic"));
+ private static final RestRequestTimeout REQUEST_TIMEOUT = () ->
RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS;
+
@Mock
private Herder herder;
private ConnectorsResource connectorsResource;
@@ -158,7 +162,7 @@ public class ConnectorsResourceTest {
public void setUp() throws NoSuchMethodException {
when(serverConfig.topicTrackingEnabled()).thenReturn(true);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
- connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient, REQUEST_TIMEOUT);
forward = mock(UriInfo.class);
MultivaluedMap<String, String> queryParams = new
MultivaluedHashMap<>();
queryParams.putSingle("forward", "true");
@@ -696,7 +700,7 @@ public class ConnectorsResourceTest {
public void testConnectorActiveTopicsWithTopicTrackingDisabled() {
when(serverConfig.topicTrackingEnabled()).thenReturn(false);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
- connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient, REQUEST_TIMEOUT);
Exception e = assertThrows(ConnectRestException.class,
() -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME));
@@ -708,7 +712,7 @@ public class ConnectorsResourceTest {
when(serverConfig.topicTrackingEnabled()).thenReturn(false);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
HttpHeaders headers = mock(HttpHeaders.class);
- connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient, REQUEST_TIMEOUT);
Exception e = assertThrows(ConnectRestException.class,
() ->
connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -720,7 +724,7 @@ public class ConnectorsResourceTest {
when(serverConfig.topicTrackingEnabled()).thenReturn(true);
when(serverConfig.topicTrackingResetEnabled()).thenReturn(false);
HttpHeaders headers = mock(HttpHeaders.class);
- connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient, REQUEST_TIMEOUT);
Exception e = assertThrows(ConnectRestException.class,
() ->
connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers));
@@ -733,7 +737,7 @@ public class ConnectorsResourceTest {
when(serverConfig.topicTrackingResetEnabled()).thenReturn(true);
when(herder.connectorActiveTopics(CONNECTOR_NAME))
.thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME,
CONNECTOR_ACTIVE_TOPICS));
- connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient, REQUEST_TIMEOUT);
Response response =
connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME);
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
@@ -746,7 +750,7 @@ public class ConnectorsResourceTest {
@Test
public void testResetConnectorActiveTopics() {
HttpHeaders headers = mock(HttpHeaders.class);
- connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient);
+ connectorsResource = new ConnectorsResource(herder, serverConfig,
restClient, REQUEST_TIMEOUT);
Response response =
connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers);
verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
index 5bf33bf5300..0dff57fb593 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
+import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.util.Callback;
import org.junit.Before;
import org.junit.Test;
@@ -74,7 +75,7 @@ public class InternalConnectResourceTest {
@Before
public void setup() {
- internalResource = new InternalConnectResource(herder, restClient);
+ internalResource = new InternalConnectResource(herder, restClient, ()
-> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS);
internalResource.uriInfo = uriInfo;
}