This is an automated email from the ASF dual-hosted git repository. davidarthur pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b8ba3c04db6a4e53d4904918774b19346ad535c8 Author: Jason Gustafson <[email protected]> AuthorDate: Thu Mar 4 12:19:34 2021 -0800 HOTFIX: Controller topic deletion should be atomic (#10264) Topic deletions should be atomic. This fixes a build error caused by merging of both https://github.com/apache/kafka/pull/10253 and https://github.com/apache/kafka/pull/10184 at about the same time. Reviewers: David Arthur <[email protected]> --- .../controller/ReplicationControlManager.java | 2 +- .../controller/ReplicationControlManagerTest.java | 24 ++++++++++++---------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 8bc0670..4a58b3a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -624,7 +624,7 @@ public class ReplicationControlManager { results.put(id, ApiError.fromThrowable(e)); } } - return new ControllerResult<>(records, results); + return ControllerResult.atomicOf(records, results); } void deleteTopic(Uuid id, List<ApiMessageAndVersion> records) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index da6e4af..8b00c10 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -67,6 +67,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(40) @@ -451,19 +452,19 @@ public class ReplicationControlManagerTest { unfenceBroker(0, ctx); registerBroker(1, ctx); unfenceBroker(1, ctx); - ControllerResult<CreateTopicsResponseData> result = + ControllerResult<CreateTopicsResponseData> createResult = replicationControl.createTopics(request); CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); - Uuid topicId = result.response().topics().find("foo").topicId(); + Uuid topicId = createResult.response().topics().find("foo").topicId(); expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). setNumPartitions(3).setReplicationFactor((short) 2). setErrorMessage(null).setErrorCode((short) 0). setTopicId(topicId)); - assertEquals(expectedResponse, result.response()); + assertEquals(expectedResponse, createResult.response()); // Until the records are replayed, no changes are made assertNull(replicationControl.getPartition(topicId, 0)); assertEmptyTopicConfigs(ctx, "foo"); - ctx.replay(result.records()); + ctx.replay(createResult.records()); assertNotNull(replicationControl.getPartition(topicId, 0)); assertNotNull(replicationControl.getPartition(topicId, 1)); assertNotNull(replicationControl.getPartition(topicId, 2)); @@ -483,17 +484,18 @@ public class ReplicationControlManagerTest { new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar"))); - ControllerResult<Map<Uuid, ApiError>> result1 = replicationControl. + ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl. deleteTopics(Collections.singletonList(invalidId)); - assertEquals(0, result1.records().size()); + assertEquals(0, invalidDeleteResult.records().size()); assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)), - result1.response()); - ControllerResult<Map<Uuid, ApiError>> result2 = replicationControl. + invalidDeleteResult.response()); + ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl. deleteTopics(Collections.singletonList(topicId)); + assertTrue(deleteResult.isAtomic()); assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)), - result2.response()); - assertEquals(1, result2.records().size()); - ctx.replay(result2.records()); + deleteResult.response()); + assertEquals(1, deleteResult.records().size()); + ctx.replay(deleteResult.records()); assertNull(replicationControl.getPartition(topicId, 0)); assertNull(replicationControl.getPartition(topicId, 1)); assertNull(replicationControl.getPartition(topicId, 2));
