This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 26119bae901 KAFKA-14463 Close ConnectorClientConfigOverridePolicy
instances (#13144)
26119bae901 is described below
commit 26119bae901ab83bc7105dfb1e8f099a6778344b
Author: Nikolay <[email protected]>
AuthorDate: Tue Jan 24 05:38:37 2023 +0300
KAFKA-14463 Close ConnectorClientConfigOverridePolicy instances (#13144)
Reviewers: Chris Egerton <[email protected]>
---
.../apache/kafka/connect/mirror/MirrorMaker.java | 8 +++---
.../kafka/connect/runtime/AbstractHerder.java | 2 ++
...aseConnectorClientConfigOverridePolicyTest.java | 2 +-
.../kafka/connect/runtime/AbstractHerderTest.java | 13 +++++++++
.../runtime/distributed/DistributedHerderTest.java | 9 +++---
.../SampleConnectorClientConfigOverridePolicy.java | 32 ++++++++++++++++++++++
.../runtime/standalone/StandaloneHerderTest.java | 10 +++----
7 files changed, 60 insertions(+), 16 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 4eab5e629e7..ae39c6cd79a 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -90,8 +90,6 @@ public class MirrorMaker {
private static final Logger log =
LoggerFactory.getLogger(MirrorMaker.class);
private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L;
- private static final ConnectorClientConfigOverridePolicy
CLIENT_CONFIG_OVERRIDE_POLICY =
- new AllConnectorClientConfigOverridePolicy();
private static final List<Class<?>> CONNECTOR_CLASSES = Arrays.asList(
MirrorSourceConnector.class,
@@ -241,7 +239,9 @@ public class MirrorMaker {
SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
KafkaOffsetBackingStore offsetBackingStore = new
KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
offsetBackingStore.configure(distributedConfig);
- Worker worker = new Worker(workerId, time, plugins, distributedConfig,
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+ ConnectorClientConfigOverridePolicy clientConfigOverridePolicy = new
AllConnectorClientConfigOverridePolicy();
+ clientConfigOverridePolicy.configure(config.originals());
+ Worker worker = new Worker(workerId, time, plugins, distributedConfig,
offsetBackingStore, clientConfigOverridePolicy);
WorkerConfigTransformer configTransformer = worker.configTransformer();
Converter internalValueConverter = worker.getInternalValueConverter();
StatusBackingStore statusBackingStore = new
KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin,
clientIdBase);
@@ -258,7 +258,7 @@ public class MirrorMaker {
// Do not provide a restClient to the DistributedHerder to indicate
that request forwarding is disabled
Herder herder = new DistributedHerder(distributedConfig, time, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
- advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY,
sharedAdmin);
+ advertisedUrl, null, clientConfigOverridePolicy, sharedAdmin);
herders.put(sourceAndTarget, herder);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 53bf8757d6a..15fb23d35a4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigDef.ConfigKey;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
@@ -151,6 +152,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
this.configBackingStore.stop();
this.worker.stop();
this.connectorExecutor.shutdown();
+ Utils.closeQuietly(this.connectorClientConfigOverridePolicy,
"connector client config override policy");
}
@Override
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
index 719de7ed7b7..9af731e7ac5 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
@@ -27,7 +27,7 @@ import java.util.Map;
public abstract class BaseConnectorClientConfigOverridePolicyTest {
- protected abstract ConnectorClientConfigOverridePolicy policyToTest();
+ protected abstract ConnectorClientConfigOverridePolicy policyToTest();
protected void testValidOverride(Map<String, Object> clientConfig) {
List<ConfigValue> configValues = configValues(clientConfig);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index eddabba8012..2e5a82f4f98 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -32,6 +32,7 @@ import
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverri
import
org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
+import
org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -158,6 +159,18 @@ public class AbstractHerderTest {
assertEquals(Collections.singleton(CONN1), new
HashSet<>(herder.connectors()));
}
+ @Test
+ public void testConnectorClientConfigOverridePolicyClose() {
+ SampleConnectorClientConfigOverridePolicy
noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
+
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore,
configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
+
+ herder.stopServices();
+ assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
+ }
+
@Test
public void testConnectorStatus() {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index af78ce4b260..e4bdc52b223 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.utils.MockTime;
-import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -228,9 +226,8 @@ public class DistributedHerderTest {
private SinkConnectorConfig conn1SinkConfig;
private SinkConnectorConfig conn1SinkConfigUpdated;
private short connectProtocolVersion;
- private final ConnectorClientConfigOverridePolicy
- noneConnectorClientConfigOverridePolicy = new
NoneConnectorClientConfigOverridePolicy();
-
+ private final SampleConnectorClientConfigOverridePolicy
+ noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
public void setUp() throws Exception {
@@ -3556,6 +3553,7 @@ public class DistributedHerderTest {
public void testHerderStopServicesClosesUponShutdown() {
assertEquals(1, shutdownCalled.getCount());
herder.stopServices();
+ assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
assertEquals(0, shutdownCalled.getCount());
}
@@ -3833,6 +3831,7 @@ public class DistributedHerderTest {
herderExecutor.shutdown();
assertTrue("herder thread did not finish in time",
herderExecutor.awaitTermination(10, TimeUnit.SECONDS));
herderFuture.get();
+ assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
}
private void expectHerderStartup() {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/SampleConnectorClientConfigOverridePolicy.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/SampleConnectorClientConfigOverridePolicy.java
new file mode 100644
index 00000000000..fc3cf1c73df
--- /dev/null
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/SampleConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
+
+public class SampleConnectorClientConfigOverridePolicy extends
NoneConnectorClientConfigOverridePolicy {
+ private boolean closed;
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 2852762acf9..b76d8f83824 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.Task;
-import
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -41,6 +39,7 @@ import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.WorkerConnector;
+import
org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -119,10 +118,8 @@ public class StandaloneHerderTest {
private LoaderSwap loaderSwap;
protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;
@Mock protected StatusBackingStore statusBackingStore;
-
- private final ConnectorClientConfigOverridePolicy
- noneConnectorClientConfigOverridePolicy = new
NoneConnectorClientConfigOverridePolicy();
-
+ private final SampleConnectorClientConfigOverridePolicy
+ noneConnectorClientConfigOverridePolicy = new
SampleConnectorClientConfigOverridePolicy();
@Before
public void setup() {
@@ -749,6 +746,7 @@ public class StandaloneHerderTest {
assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
herder.stop();
+ assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
PowerMock.verifyAll();
}