This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7322f4cd55d MINOR: reformat ClusterConfigState constructions in 
Abstract & DistributedHerder (#13286)
7322f4cd55d is described below

commit 7322f4cd55dc08abdc6ccf51ed33f7f0d869dd0e
Author: Greg Harris <[email protected]>
AuthorDate: Tue Feb 28 09:50:44 2023 -0800

    MINOR: reformat ClusterConfigState constructions in Abstract & 
DistributedHerder (#13286)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../kafka/connect/runtime/AbstractHerderTest.java  |  28 +++-
 .../runtime/distributed/DistributedHerderTest.java | 157 ++++++++++++++++-----
 2 files changed, 146 insertions(+), 39 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 032f09322d7..7cb0b056b9f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -129,12 +129,28 @@ public class AbstractHerderTest {
         TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
         TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
     }
-    private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet());
-    private static final ClusterConfigState SNAPSHOT_NO_TASKS = new 
ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_NO_TASKS = new 
ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
 
     private final String workerId = "workerId";
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 722be88fb7e..03aadd31166 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -193,15 +193,39 @@ public class DistributedHerderTest {
         TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
         TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
     }
-    private static final ClusterConfigState SNAPSHOT = new 
ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet());
-    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new 
ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.PAUSED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet());
-    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = 
new ClusterConfigState(1, null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new 
ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG),
+            Collections.singletonMap(CONN1, TargetState.PAUSED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
+    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = 
new ClusterConfigState(
+            1,
+            null,
+            Collections.singletonMap(CONN1, 3),
+            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED),
+            Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP,
+            Collections.emptyMap(),
+            Collections.emptyMap(),
+            Collections.emptySet(),
+            Collections.emptySet());
 
     private static final String WORKER_ID = "localhost:8083";
     private static final String KAFKA_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@@ -623,9 +647,17 @@ public class DistributedHerderTest {
             // Same as SNAPSHOT, except with an updated offset
             // Allow the task to read to the end of the topic and complete the 
rebalance
             ClusterConfigState secondSnapshot = new ClusterConfigState(
-                configOffset, null, Collections.singletonMap(CONN1, 3),
-                Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-                TASK_CONFIGS_MAP, Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptySet(), Collections.emptySet());
+                    configOffset,
+                    null,
+                    Collections.singletonMap(CONN1, 3),
+                    Collections.singletonMap(CONN1, CONN1_CONFIG),
+                    Collections.singletonMap(CONN1, TargetState.STARTED),
+                    TASK_CONFIGS_MAP,
+                    Collections.emptyMap(),
+                    Collections.emptyMap(),
+                    Collections.emptySet(),
+                    Collections.emptySet()
+            );
             expectConfigRefreshAndSnapshot(secondSnapshot);
         }
         member.requestRejoin();
@@ -1962,9 +1994,19 @@ public class DistributedHerderTest {
         member.ensureActive();
         PowerMock.expectLastCall();
         // During the next tick, throw an error from the transformer
-        ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, 
null, Collections.singletonMap(CONN1, 3),
-                Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-                TASK_CONFIGS_MAP, Collections.emptyMap(), 
Collections.emptyMap(), Collections.emptySet(), Collections.emptySet(), 
configTransformer);
+        ClusterConfigState snapshotWithTransform = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                configTransformer
+        );
         
EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithTransform);
         EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), 
EasyMock.anyObject()))
             .andThrow(new ConfigException("Simulated exception thrown during 
config transformation"));
@@ -2558,9 +2600,18 @@ public class DistributedHerderTest {
         EasyMock.expect(configTransformer.transform(EasyMock.eq(CONN1), 
EasyMock.anyObject()))
             .andThrow(new AssertionError("Config transformation should not 
occur when requesting connector or task info"));
         EasyMock.replay(configTransformer);
-        ClusterConfigState snapshotWithTransform = new ClusterConfigState(1, 
null, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet(), configTransformer);
+        ClusterConfigState snapshotWithTransform = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                configTransformer);
 
         expectConfigRefreshAndSnapshot(snapshotWithTransform);
 
@@ -2721,9 +2772,17 @@ public class DistributedHerderTest {
 
         expectRebalance(2, Collections.emptyList(), Collections.emptyList());
         SessionKey initialKey = new SessionKey(EasyMock.mock(SecretKey.class), 
0);
-        ClusterConfigState snapshotWithKey =  new ClusterConfigState(2, 
initialKey, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet());
+        ClusterConfigState snapshotWithKey =  new ClusterConfigState(
+                2,
+                initialKey,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
         expectConfigRefreshAndSnapshot(snapshotWithKey);
         // Second rebalance: poll indefinitely as worker is follower, so 
expiration still doesn't come into play
         member.poll(Long.MAX_VALUE);
@@ -2761,9 +2820,17 @@ public class DistributedHerderTest {
         
EasyMock.expect(initialSecretKey.getAlgorithm()).andReturn(DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT).anyTimes();
         EasyMock.expect(initialSecretKey.getEncoded()).andReturn(new 
byte[32]).anyTimes();
         SessionKey initialKey = new SessionKey(initialSecretKey, 
time.milliseconds());
-        ClusterConfigState snapshotWithKey =  new ClusterConfigState(1, 
initialKey, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet());
+        ClusterConfigState snapshotWithKey =  new ClusterConfigState(
+                1,
+                initialKey,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
         expectConfigRefreshAndSnapshot(snapshotWithKey);
         // First rebalance: poll for a limited time as worker is leader and 
must wake up for key expiration
         member.poll(leq(rotationTtlDelay));
@@ -2962,9 +3029,17 @@ public class DistributedHerderTest {
         
EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
         EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
         SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
-        ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, 
sessionKey, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-            TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptySet(), Collections.emptySet());
+        ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(
+                1,
+                sessionKey,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptySet());
 
         // First tick -- after joining the group, we try to write a new 
session key to
         // the config topic, and fail (in this case, we're trying to simulate 
that we've
@@ -3514,9 +3589,17 @@ public class DistributedHerderTest {
     @Test
     public void testVerifyTaskGeneration() {
         Map<String, Integer> taskConfigGenerations = new HashMap<>();
-        herder.configState = new ClusterConfigState(1, null, 
Collections.singletonMap(CONN1, 3),
-                Collections.singletonMap(CONN1, CONN1_CONFIG), 
Collections.singletonMap(CONN1, TargetState.STARTED),
-                TASK_CONFIGS_MAP, Collections.emptyMap(), 
taskConfigGenerations, Collections.emptySet(), Collections.emptySet());
+        herder.configState = new ClusterConfigState(
+                1,
+                null,
+                Collections.singletonMap(CONN1, 3),
+                Collections.singletonMap(CONN1, CONN1_CONFIG),
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                TASK_CONFIGS_MAP,
+                Collections.emptyMap(),
+                taskConfigGenerations,
+                Collections.emptySet(),
+                Collections.emptySet());
 
         Callback<Void> verifyCallback = EasyMock.mock(Callback.class);
         for (int i = 0; i < 5; i++) {
@@ -3938,9 +4021,17 @@ public class DistributedHerderTest {
         Map<String, Map<String, String>> connectorConfigs = connectors.stream()
                 .collect(Collectors.toMap(Function.identity(), c -> 
CONN1_CONFIG));
 
-        return new ClusterConfigState(1, sessionKey, taskCounts,
-                connectorConfigs, Collections.singletonMap(CONN1, 
TargetState.STARTED),
-                taskConfigs, taskCountRecords, taskConfigGenerations, 
pendingFencing, Collections.emptySet());
+        return new ClusterConfigState(
+                1,
+                sessionKey,
+                taskCounts,
+                connectorConfigs,
+                Collections.singletonMap(CONN1, TargetState.STARTED),
+                taskConfigs,
+                taskCountRecords,
+                taskConfigGenerations,
+                pendingFencing,
+                Collections.emptySet());
     }
 
     private void expectExecuteTaskReconfiguration(boolean running, 
ConnectorConfig connectorConfig, IAnswer<List<Map<String, String>>> answer) {

Reply via email to