hachikuji commented on code in PR #13104: URL: https://github.com/apache/kafka/pull/13104#discussion_r1067582346
########## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ########## @@ -605,6 +617,45 @@ public void testCreateTopicsWithConfigs() throws Exception { "Null value not supported for topic configs: foo", result2.response().topics().find("bar").errorMessage() ); + + CreateTopicsRequestData request3 = new CreateTopicsRequestData(); + request3.topics().add(new CreatableTopic().setName("baz") + .setNumPartitions(-1).setReplicationFactor((short) -2) + .setConfigs(validConfigs)); + + ControllerResult<CreateTopicsResponseData> result3 = + replicationControl.createTopics(request3, Collections.singleton("baz")); + assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode()); + assertEquals(Collections.emptyList(), result3.records()); + + // Test request with multiple topics together. + CreateTopicsRequestData request4 = new CreateTopicsRequestData(); + String batchedTopic1 = "batched-topic-1"; + request4.topics().add(new CreatableTopic().setName(batchedTopic1) + .setNumPartitions(-1).setReplicationFactor((short) -1) + .setConfigs(validConfigs)); + String batchedTopic2 = "batched-topic2"; + request4.topics().add(new CreatableTopic().setName(batchedTopic2) + .setNumPartitions(-1).setReplicationFactor((short) -2) + .setConfigs(validConfigs)); + + Set<String> request4Topics = new HashSet<>(); + request4Topics.add(batchedTopic1); + request4Topics.add(batchedTopic2); + ControllerResult<CreateTopicsResponseData> result4 = + replicationControl.createTopics(request4, request4Topics); + + assertEquals(Errors.NONE.code(), result4.response().topics().find(batchedTopic1).errorCode()); + assertEquals(INVALID_REPLICATION_FACTOR.code(), result4.response().topics().find(batchedTopic2).errorCode()); + + assertEquals(3, result4.records().size()); + assertEquals(TopicRecord.class, result4.records().get(0).message().getClass()); + TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message(); + assertEquals(batchedTopic1, batchedTopic1Record.name()); + assertEquals(ConfigRecord.class, result4.records().get(1).message().getClass()); + assertEquals(batchedTopic1, ((ConfigRecord) result4.records().get(1).message()).resourceName()); Review Comment: Wonder if it would be better to assert the full ConfigRecord instead of just checking one field? ########## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ########## @@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs( Map<ConfigResource, ApiError> outputResults = new HashMap<>(); for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry : configChanges.entrySet()) { - incrementalAlterConfigResource(resourceEntry.getKey(), + ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(), resourceEntry.getValue(), newlyCreatedResource, - outputRecords, - outputResults); + outputRecords); + outputResults.put(resourceEntry.getKey(), apiError); } return ControllerResult.atomicOf(outputRecords, outputResults); } - private void incrementalAlterConfigResource(ConfigResource configResource, + ControllerResult<ApiError> incrementalAlterConfig( + ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps, + boolean newlyCreatedResource) { + List<ApiMessageAndVersion> outputRecords = new ArrayList<>(); + ApiError apiError = incrementalAlterConfigResource(configResource, + keyToOps, + newlyCreatedResource, + outputRecords); + return ControllerResult.atomicOf(outputRecords, apiError); + } + + private ApiError incrementalAlterConfigResource(ConfigResource configResource, Map<String, Entry<OpType, String>> keysToOps, boolean newlyCreatedResource, - List<ApiMessageAndVersion> outputRecords, - Map<ConfigResource, ApiError> outputResults) { + List<ApiMessageAndVersion> outputRecords) { Review Comment: nit: can we align the arguments? ########## metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java: ########## @@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs( Map<ConfigResource, ApiError> outputResults = new HashMap<>(); for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> resourceEntry : configChanges.entrySet()) { - incrementalAlterConfigResource(resourceEntry.getKey(), + ApiError apiError = incrementalAlterConfigResource(resourceEntry.getKey(), resourceEntry.getValue(), newlyCreatedResource, - outputRecords, - outputResults); + outputRecords); + outputResults.put(resourceEntry.getKey(), apiError); } return ControllerResult.atomicOf(outputRecords, outputResults); } - private void incrementalAlterConfigResource(ConfigResource configResource, + ControllerResult<ApiError> incrementalAlterConfig( + ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps, Review Comment: nit: move second argument to next line. Not everyone agrees, but I also like to put the closing parenthesis on a new line. ```java ControllerResult<ApiError> incrementalAlterConfig( ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps, boolean newlyCreatedResource ) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org