hachikuji commented on code in PR #13104:
URL: https://github.com/apache/kafka/pull/13104#discussion_r1067582346


##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -605,6 +617,45 @@ public void testCreateTopicsWithConfigs() throws Exception 
{
             "Null value not supported for topic configs: foo",
             result2.response().topics().find("bar").errorMessage()
         );
+
+        CreateTopicsRequestData request3 = new CreateTopicsRequestData();
+        request3.topics().add(new CreatableTopic().setName("baz")
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        ControllerResult<CreateTopicsResponseData> result3 =
+            replicationControl.createTopics(request3, 
Collections.singleton("baz"));
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), 
result3.response().topics().find("baz").errorCode());
+        assertEquals(Collections.emptyList(), result3.records());
+
+        // Test request with multiple topics together.
+        CreateTopicsRequestData request4 = new CreateTopicsRequestData();
+        String batchedTopic1 = "batched-topic-1";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic1)
+            .setNumPartitions(-1).setReplicationFactor((short) -1)
+            .setConfigs(validConfigs));
+        String batchedTopic2 = "batched-topic2";
+        request4.topics().add(new CreatableTopic().setName(batchedTopic2)
+            .setNumPartitions(-1).setReplicationFactor((short) -2)
+            .setConfigs(validConfigs));
+
+        Set<String> request4Topics = new HashSet<>();
+        request4Topics.add(batchedTopic1);
+        request4Topics.add(batchedTopic2);
+        ControllerResult<CreateTopicsResponseData> result4 =
+            replicationControl.createTopics(request4, request4Topics);
+
+        assertEquals(Errors.NONE.code(), 
result4.response().topics().find(batchedTopic1).errorCode());
+        assertEquals(INVALID_REPLICATION_FACTOR.code(), 
result4.response().topics().find(batchedTopic2).errorCode());
+
+        assertEquals(3, result4.records().size());
+        assertEquals(TopicRecord.class, 
result4.records().get(0).message().getClass());
+        TopicRecord batchedTopic1Record = (TopicRecord) 
result4.records().get(0).message();
+        assertEquals(batchedTopic1, batchedTopic1Record.name());
+        assertEquals(ConfigRecord.class, 
result4.records().get(1).message().getClass());
+        assertEquals(batchedTopic1, ((ConfigRecord) 
result4.records().get(1).message()).resourceName());

Review Comment:
   Wonder if it would be better to assert the full ConfigRecord instead of just 
checking one field? 



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> 
incrementalAlterConfigs(
         Map<ConfigResource, ApiError> outputResults = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> 
resourceEntry :
                 configChanges.entrySet()) {
-            incrementalAlterConfigResource(resourceEntry.getKey(),
+            ApiError apiError = 
incrementalAlterConfigResource(resourceEntry.getKey(),
                 resourceEntry.getValue(),
                 newlyCreatedResource,
-                outputRecords,
-                outputResults);
+                outputRecords);
+            outputResults.put(resourceEntry.getKey(), apiError);
         }
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
-    private void incrementalAlterConfigResource(ConfigResource configResource,
+    ControllerResult<ApiError> incrementalAlterConfig(
+        ConfigResource configResource, Map<String, Entry<OpType, String>> 
keyToOps,
+        boolean newlyCreatedResource) {
+        List<ApiMessageAndVersion> outputRecords = new ArrayList<>();
+        ApiError apiError = incrementalAlterConfigResource(configResource,
+            keyToOps,
+            newlyCreatedResource,
+            outputRecords);
+        return ControllerResult.atomicOf(outputRecords, apiError);
+    }
+
+    private ApiError incrementalAlterConfigResource(ConfigResource 
configResource,
                                                 Map<String, Entry<OpType, 
String>> keysToOps,
                                                 boolean newlyCreatedResource,
-                                                List<ApiMessageAndVersion> 
outputRecords,
-                                                Map<ConfigResource, ApiError> 
outputResults) {
+                                                List<ApiMessageAndVersion> 
outputRecords) {

Review Comment:
   nit: can we align the arguments?



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -172,20 +172,30 @@ ControllerResult<Map<ConfigResource, ApiError>> 
incrementalAlterConfigs(
         Map<ConfigResource, ApiError> outputResults = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<OpType, String>>> 
resourceEntry :
                 configChanges.entrySet()) {
-            incrementalAlterConfigResource(resourceEntry.getKey(),
+            ApiError apiError = 
incrementalAlterConfigResource(resourceEntry.getKey(),
                 resourceEntry.getValue(),
                 newlyCreatedResource,
-                outputRecords,
-                outputResults);
+                outputRecords);
+            outputResults.put(resourceEntry.getKey(), apiError);
         }
         return ControllerResult.atomicOf(outputRecords, outputResults);
     }
 
-    private void incrementalAlterConfigResource(ConfigResource configResource,
+    ControllerResult<ApiError> incrementalAlterConfig(
+        ConfigResource configResource, Map<String, Entry<OpType, String>> 
keyToOps,

Review Comment:
   nit: move second argument to next line. Not everyone agrees, but I also like 
to put the closing parenthesis on a new line.
   ```java
   ControllerResult<ApiError> incrementalAlterConfig(
       ConfigResource configResource, 
       Map<String, Entry<OpType, String>> keyToOps,
       boolean newlyCreatedResource
   ) {
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to