This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 26119bae901 KAFKA-14463 Close ConnectorClientConfigOverridePolicy 
instances (#13144)
26119bae901 is described below

commit 26119bae901ab83bc7105dfb1e8f099a6778344b
Author: Nikolay <[email protected]>
AuthorDate: Tue Jan 24 05:38:37 2023 +0300

    KAFKA-14463 Close ConnectorClientConfigOverridePolicy instances (#13144)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../apache/kafka/connect/mirror/MirrorMaker.java   |  8 +++---
 .../kafka/connect/runtime/AbstractHerder.java      |  2 ++
 ...aseConnectorClientConfigOverridePolicyTest.java |  2 +-
 .../kafka/connect/runtime/AbstractHerderTest.java  | 13 +++++++++
 .../runtime/distributed/DistributedHerderTest.java |  9 +++---
 .../SampleConnectorClientConfigOverridePolicy.java | 32 ++++++++++++++++++++++
 .../runtime/standalone/StandaloneHerderTest.java   | 10 +++----
 7 files changed, 60 insertions(+), 16 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 4eab5e629e7..ae39c6cd79a 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -90,8 +90,6 @@ public class MirrorMaker {
     private static final Logger log = 
LoggerFactory.getLogger(MirrorMaker.class);
 
     private static final long SHUTDOWN_TIMEOUT_SECONDS = 60L;
-    private static final ConnectorClientConfigOverridePolicy 
CLIENT_CONFIG_OVERRIDE_POLICY =
-            new AllConnectorClientConfigOverridePolicy();
 
     private static final List<Class<?>> CONNECTOR_CLASSES = Arrays.asList(
         MirrorSourceConnector.class,
@@ -241,7 +239,9 @@ public class MirrorMaker {
         SharedTopicAdmin sharedAdmin = new SharedTopicAdmin(adminProps);
         KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase);
         offsetBackingStore.configure(distributedConfig);
-        Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
+        ConnectorClientConfigOverridePolicy clientConfigOverridePolicy = new 
AllConnectorClientConfigOverridePolicy();
+        clientConfigOverridePolicy.configure(config.originals());
+        Worker worker = new Worker(workerId, time, plugins, distributedConfig, 
offsetBackingStore, clientConfigOverridePolicy);
         WorkerConfigTransformer configTransformer = worker.configTransformer();
         Converter internalValueConverter = worker.getInternalValueConverter();
         StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter, sharedAdmin, 
clientIdBase);
@@ -258,7 +258,7 @@ public class MirrorMaker {
         // Do not provide a restClient to the DistributedHerder to indicate 
that request forwarding is disabled
         Herder herder = new DistributedHerder(distributedConfig, time, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl, null, CLIENT_CONFIG_OVERRIDE_POLICY, 
sharedAdmin);
+                advertisedUrl, null, clientConfigOverridePolicy, sharedAdmin);
         herders.put(sourceAndTarget, herder);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 53bf8757d6a..15fb23d35a4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigDef.ConfigKey;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest;
@@ -151,6 +152,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
         this.configBackingStore.stop();
         this.worker.stop();
         this.connectorExecutor.shutdown();
+        Utils.closeQuietly(this.connectorClientConfigOverridePolicy, 
"connector client config override policy");
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
index 719de7ed7b7..9af731e7ac5 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java
@@ -27,7 +27,7 @@ import java.util.Map;
 
 public abstract class BaseConnectorClientConfigOverridePolicyTest {
 
-    protected abstract ConnectorClientConfigOverridePolicy  policyToTest();
+    protected abstract ConnectorClientConfigOverridePolicy policyToTest();
 
     protected void testValidOverride(Map<String, Object> clientConfig) {
         List<ConfigValue> configValues = configValues(clientConfig);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index eddabba8012..2e5a82f4f98 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -32,6 +32,7 @@ import 
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverri
 import 
org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import 
org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -158,6 +159,18 @@ public class AbstractHerderTest {
         assertEquals(Collections.singleton(CONN1), new 
HashSet<>(herder.connectors()));
     }
 
+    @Test
+    public void testConnectorClientConfigOverridePolicyClose() {
+        SampleConnectorClientConfigOverridePolicy 
noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
+
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+            .useConstructor(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
+            .defaultAnswer(CALLS_REAL_METHODS));
+
+        herder.stopServices();
+        assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
+    }
+
     @Test
     public void testConnectorStatus() {
         ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index af78ce4b260..e4bdc52b223 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.utils.MockTime;
-import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import 
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -228,9 +226,8 @@ public class DistributedHerderTest {
     private SinkConnectorConfig conn1SinkConfig;
     private SinkConnectorConfig conn1SinkConfigUpdated;
     private short connectProtocolVersion;
-    private final ConnectorClientConfigOverridePolicy
-        noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy();
-
+    private final SampleConnectorClientConfigOverridePolicy
+        noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
     public void setUp() throws Exception {
@@ -3556,6 +3553,7 @@ public class DistributedHerderTest {
     public void testHerderStopServicesClosesUponShutdown() {
         assertEquals(1, shutdownCalled.getCount());
         herder.stopServices();
+        assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
         assertEquals(0, shutdownCalled.getCount());
     }
 
@@ -3833,6 +3831,7 @@ public class DistributedHerderTest {
         herderExecutor.shutdown();
         assertTrue("herder thread did not finish in time", 
herderExecutor.awaitTermination(10, TimeUnit.SECONDS));
         herderFuture.get();
+        assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
     }
 
     private void expectHerderStartup() {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/SampleConnectorClientConfigOverridePolicy.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/SampleConnectorClientConfigOverridePolicy.java
new file mode 100644
index 00000000000..fc3cf1c73df
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/SampleConnectorClientConfigOverridePolicy.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import 
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
+
+public class SampleConnectorClientConfigOverridePolicy extends 
NoneConnectorClientConfigOverridePolicy {
+    private boolean closed;
+
+    @Override
+    public void close() {
+        closed = true;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 2852762acf9..b76d8f83824 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -21,8 +21,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.Task;
-import 
org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
-import 
org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -41,6 +39,7 @@ import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerConnector;
+import 
org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
 import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
@@ -119,10 +118,8 @@ public class StandaloneHerderTest {
     private LoaderSwap loaderSwap;
     protected FutureCallback<Herder.Created<ConnectorInfo>> createCallback;
     @Mock protected StatusBackingStore statusBackingStore;
-
-    private final ConnectorClientConfigOverridePolicy
-        noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy();
-
+    private final SampleConnectorClientConfigOverridePolicy
+        noneConnectorClientConfigOverridePolicy = new 
SampleConnectorClientConfigOverridePolicy();
 
     @Before
     public void setup() {
@@ -749,6 +746,7 @@ public class StandaloneHerderTest {
         assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
 
         herder.stop();
+        assertTrue(noneConnectorClientConfigOverridePolicy.isClosed());
 
         PowerMock.verifyAll();
     }

Reply via email to