This is an automated email from the ASF dual-hosted git repository. kkarantasis pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new 2d92d2d KAFKA-9888: Copy connector configs before passing to REST extensions (#8511) 2d92d2d is described below commit 2d92d2d73b218a54ca324a26a475f0b0d67d5693 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Sat May 23 15:35:43 2020 -0700 KAFKA-9888: Copy connector configs before passing to REST extensions (#8511) The changes made in KIP-454 involved adding a `connectorConfig` method to the ConnectClusterState interface that REST extensions could use to query the worker for the configuration of a given connector. The implementation for this method returns the Java `Map` that's stored in the worker's view of the config topic (when running in distributed mode). No copying is performed, which causes mutations of that `Map` to persist across invocations of `connectorConfig` and, even worse, propaga [...] In this commit the map is copied before it's returned to REST extensions. An existing unit test is modified to ensure that REST extensions receive a copy of the connector config, not the original. Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <konstant...@confluent.io> --- .../kafka/connect/runtime/health/ConnectClusterStateImpl.java | 2 +- .../apache/kafka/connect/storage/KafkaConfigBackingStore.java | 4 ++-- .../connect/runtime/health/ConnectClusterStateImplTest.java | 9 ++++++++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java index 38362b3..6b7285d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java @@ -86,7 +86,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState { FutureCallback<Map<String, String>> connectorConfigCallback = new FutureCallback<>(); herder.connectorConfig(connName, connectorConfigCallback); try { - return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS); + return new HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS)); } catch (InterruptedException | ExecutionException | TimeoutException e) { throw new ConnectException( String.format("Failed to retrieve configuration for connector '%s'", connName), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 39a8f35..658d6c3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -280,8 +280,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore { @Override public ClusterConfigState snapshot() { synchronized (lock) { - // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be - // immutable configs + // Only a shallow copy is performed here; in order to avoid accidentally corrupting the worker's view + // of the config topic, any nested structures should be copied before making modifications return new ClusterConfigState( offset, sessionKey, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java index d8984f0..d8a7e49 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertThrows; @RunWith(PowerMockRunner.class) @@ -87,7 +88,13 @@ public class ConnectClusterStateImplTest { } }); EasyMock.replay(herder); - assertEquals(expectedConfig, connectClusterState.connectorConfig(connName)); + Map<String, String> actualConfig = connectClusterState.connectorConfig(connName); + assertEquals(expectedConfig, actualConfig); + assertNotSame( + "Config should be copied in order to avoid mutation by REST extensions", + expectedConfig, + actualConfig + ); } @Test