vvcephei commented on code in PR #12641:
URL: https://github.com/apache/kafka/pull/12641#discussion_r972139781


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -233,6 +235,15 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String CLIENT_TAG_PREFIX = "client.tag.";
 
+    /** {@code topology.optimization} */
+    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
+    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka "
+        + "Streams if it should optimize the topology and what optimizations 
to apply. "
+        + "Acceptable values are: \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+        + "or a comma separated list of specific optimizations: "
+        + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\"). 
"
+        + "Disabled by default.";

Review Comment:
   ```suggestion
           + NO_OPTIMIZATION+" by default.";
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1653,6 +1678,34 @@ private Map<String, Object> getClientCustomProps() {
         return props;
     }
 
+    public static Set<String> verifyTopologyOptimizationConfigs(final String 
config) {
+        final List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
+        final Set<String> verifiedConfigs = new HashSet<>();
+        // Verify it doesn't contain none or all plus a list of optimizations
+        if (configs.contains(NO_OPTIMIZATION) || configs.contains(OPTIMIZE)) {
+            if (configs.size() > 1) {
+                throw new ConfigException("A topology can either not be 
optimized with " + NO_OPTIMIZATION + " "
+                                              + "or optimized. If you want to 
optimize the "
+                                              + "topology, you can choose 
between all "
+                                              + "optimizations with " + 
OPTIMIZE + " " + "or "
+                                              + "specific optimizations by 
specifying a comma "
+                                              + "separated list.");

Review Comment:
   I took a closer look at the text of this error, and I'm afraid it might be 
confusing to users. The rendered text will say "A topology can either not be 
optimized with none or optimized.", which seems hard to parse to me, both 
because the sentence structure is confusing and because the config to turn off 
optimizations is mentioned ("none"), but the config to turn them on is not 
("all").
   
   ```suggestion
                   throw new ConfigException(
                       "\"" + config + "\" is not a valid optimization config. 
" +
                       "Valid optimization configs are: " +
                       "\"" + OPTIMIZE + "\" to enable all optimizations, " +
                       "\"" + NO_OPTIMIZATION + "\" to disable all 
optimizations, " +
                       "or a comma separated list of specific optimizations: " +
                       "(\"" + REUSE_KTABLE_SOURCE_TOPICS + "\", " +
                       MERGE_REPARTITION_TOPICS + "\"). "
                   );
   ```
   
   Realistically, I'd probably build the string for the second part of that 
message ("Valid ...") once and save it in a private constant for use in both 
the doc string as well as the validation error.



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,54 @@ public void testInvalidSecurityProtocol() {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+        final String value = String.join(",", StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("A topology can either not 
be optimized with"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOffAndSet() {
+        final String value = String.join(",", StreamsConfig.NO_OPTIMIZATION, 
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("A topology can either not 
be optimized with"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenOptimizationDoesNotExist() {
+        final String value = String.join(",",
+                                         
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+                                         
"topology.optimization.does.not.exist",
+                                         
StreamsConfig.MERGE_REPARTITION_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationDoesNotExist() {
+        final String value = String.join(",", 
"topology.optimization.does.not.exist");
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldAllowMultipleOptimizations() {

Review Comment:
   Can we add a test that "all" results in all the expected optimizations being 
listed? And that "non" results in an empty list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to