This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 2d92d2d  KAFKA-9888: Copy connector configs before passing to REST 
extensions (#8511)
2d92d2d is described below

commit 2d92d2d73b218a54ca324a26a475f0b0d67d5693
Author: Chris Egerton <chr...@confluent.io>
AuthorDate: Sat May 23 15:35:43 2020 -0700

    KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)
    
    The changes made in KIP-454 involved adding a `connectorConfig` method to 
the ConnectClusterState interface that REST extensions could use to query the 
worker for the configuration of a given connector. The implementation for this 
method returns the Java `Map` that's stored in the worker's view of the config 
topic (when running in distributed mode). No copying is performed, which causes 
mutations of that `Map` to persist across invocations of `connectorConfig` and, 
even worse, propaga [...]
    
    In this commit the map is copied before it's returned to REST extensions.
    
    An existing unit test is modified to ensure that REST extensions receive a 
copy of the connector config, not the original.
    
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis 
<konstant...@confluent.io>
---
 .../kafka/connect/runtime/health/ConnectClusterStateImpl.java    | 2 +-
 .../apache/kafka/connect/storage/KafkaConfigBackingStore.java    | 4 ++--
 .../connect/runtime/health/ConnectClusterStateImplTest.java      | 9 ++++++++-
 3 files changed, 11 insertions(+), 4 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index 38362b3..6b7285d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -86,7 +86,7 @@ public class ConnectClusterStateImpl implements 
ConnectClusterState {
         FutureCallback<Map<String, String>> connectorConfigCallback = new 
FutureCallback<>();
         herder.connectorConfig(connName, connectorConfigCallback);
         try {
-            return connectorConfigCallback.get(herderRequestTimeoutMs, 
TimeUnit.MILLISECONDS);
+            return new 
HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, 
TimeUnit.MILLISECONDS));
         } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
             throw new ConnectException(
                 String.format("Failed to retrieve configuration for connector 
'%s'", connName),
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 39a8f35..658d6c3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -280,8 +280,8 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     @Override
     public ClusterConfigState snapshot() {
         synchronized (lock) {
-            // Doing a shallow copy of the data is safe here because the 
complex nested data that is copied should all be
-            // immutable configs
+            // Only a shallow copy is performed here; in order to avoid 
accidentally corrupting the worker's view
+            // of the config topic, any nested structures should be copied 
before making modifications
             return new ClusterConfigState(
                     offset,
                     sessionKey,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
index d8984f0..d8a7e49 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertThrows;
 
 @RunWith(PowerMockRunner.class)
@@ -87,7 +88,13 @@ public class ConnectClusterStateImplTest {
             }
         });
         EasyMock.replay(herder);
-        assertEquals(expectedConfig, 
connectClusterState.connectorConfig(connName));
+        Map<String, String> actualConfig = 
connectClusterState.connectorConfig(connName);
+        assertEquals(expectedConfig, actualConfig);
+        assertNotSame(
+            "Config should be copied in order to avoid mutation by REST 
extensions",
+            expectedConfig,
+            actualConfig
+        );
     }
 
     @Test

Reply via email to