vvcephei commented on code in PR #12641:
URL: https://github.com/apache/kafka/pull/12641#discussion_r972139781
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -233,6 +235,15 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String CLIENT_TAG_PREFIX = "client.tag.";
+ /** {@code topology.optimization} */
+ public static final String TOPOLOGY_OPTIMIZATION_CONFIG =
"topology.optimization";
+ private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration
telling Kafka "
+ + "Streams if it should optimize the topology and what optimizations
to apply. "
+ + "Acceptable values are: \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+ + "or a comma separated list of specific optimizations: "
+ + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", \"+MERGE_REPARTITION_TOPICS+\").
"
+ + "Disabled by default.";
Review Comment:
```suggestion
+ NO_OPTIMIZATION+" by default.";
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1653,6 +1678,34 @@ private Map<String, Object> getClientCustomProps() {
return props;
}
+ public static Set<String> verifyTopologyOptimizationConfigs(final String
config) {
+ final List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
+ final Set<String> verifiedConfigs = new HashSet<>();
+ // Verify it doesn't contain none or all plus a list of optimizations
+ if (configs.contains(NO_OPTIMIZATION) || configs.contains(OPTIMIZE)) {
+ if (configs.size() > 1) {
+ throw new ConfigException("A topology can either not be
optimized with " + NO_OPTIMIZATION + " "
+ + "or optimized. If you want to
optimize the "
+ + "topology, you can choose
between all "
+ + "optimizations with " +
OPTIMIZE + " " + "or "
+ + "specific optimizations by
specifying a comma "
+ + "separated list.");
Review Comment:
I took a closer look at the text of this error, and I'm afraid it might be
confusing to users. The rendered text will say "A topology can either not be
optimized with none or optimized.", which seems hard to parse to me, both
because the sentence structure is confusing and because the config to turn off
optimizations is mentioned ("none"), but the config to turn them on is not
("all").
```suggestion
throw new ConfigException(
"\"" + config + "\" is not a valid optimization config.
" +
"Valid optimization configs are: " +
"\"" + OPTIMIZE + "\" to enable all optimizations, " +
"\"" + NO_OPTIMIZATION + "\" to disable all
optimizations, " +
"or a comma separated list of specific optimizations: " +
"(\"" + REUSE_KTABLE_SOURCE_TOPICS + "\", " +
MERGE_REPARTITION_TOPICS + "\"). "
);
```
Realistically, I'd probably build the string for the second part of that
message ("Valid ...") once and save it in a private constant for use in both
the doc string as well as the validation error.
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1263,6 +1263,54 @@ public void testInvalidSecurityProtocol() {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+ final String value = String.join(",", StreamsConfig.OPTIMIZE,
StreamsConfig.NO_OPTIMIZATION);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("A topology can either not
be optimized with"));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenTopologyOptimizationOffAndSet() {
+ final String value = String.join(",", StreamsConfig.NO_OPTIMIZATION,
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("A topology can either not
be optimized with"));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenOptimizationDoesNotExist() {
+ final String value = String.join(",",
+
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+
"topology.optimization.does.not.exist",
+
StreamsConfig.MERGE_REPARTITION_TOPICS);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("Unrecognized config."));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenTopologyOptimizationDoesNotExist() {
+ final String value = String.join(",",
"topology.optimization.does.not.exist");
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("Unrecognized config."));
+ }
+
+ @Test
+ public void shouldAllowMultipleOptimizations() {
Review Comment:
Can we add a test that "all" results in all the expected optimizations being
listed? And that "non" results in an empty list?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]