This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cda5da9b65f KAFKA-14209: Change Topology optimization to accept list
of rules 1/3 (#12641)
cda5da9b65f is described below
commit cda5da9b65f78b51cdfe5371f712a0d392dbdb4d
Author: Vicky Papavasileiou <[email protected]>
AuthorDate: Thu Sep 22 17:20:37 2022 +0100
KAFKA-14209: Change Topology optimization to accept list of rules 1/3
(#12641)
This PR is part of a series implementing the self-join rewriting. As part
of it, we decided to clean up the TOPOLOGY_OPTIMIZATION_CONFIG and make it a
list of optimization rules. Acceptable values are: NO_OPTIMIZATION, OPTIMIZE
which applies all optimization rules or a comma separated list of specific
optimizations.
Reviewers: Guozhang Wang <[email protected]>, John Roesler
<[email protected]>
---
docs/streams/developer-guide/config-streams.html | 13 +++--
.../org/apache/kafka/streams/StreamsBuilder.java | 6 +--
.../org/apache/kafka/streams/StreamsConfig.java | 59 ++++++++++++++++++--
.../kstream/internals/InternalStreamsBuilder.java | 39 ++++++++++----
.../apache/kafka/streams/StreamsConfigTest.java | 63 ++++++++++++++++++++++
5 files changed, 157 insertions(+), 23 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index 0aee6b6e1dd..fcc0ea9993f 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -424,8 +424,8 @@
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);</code></pre>
</tr>
<tr class="row-even"><td>topology.optimization</td>
<td>Medium</td>
- <td colspan="2">A configuration telling Kafka Streams if it should
optimize the topology</td>
- <td>none</td>
+ <td colspan="2">A configuration telling Kafka Streams if it should
optimize the topology and what optimizations to apply. Acceptable values are:
<code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>),
<code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated
list of specific optimizations:
(<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code>
(<code>reuse.ktable.source.topics</code>),
<code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge. [...]
+ <td><code> NO_OPTIMIZATION</code></td>
</tr>
<tr class="row-odd"><td>upgrade.from</td>
<td>Medium</td>
@@ -942,8 +942,13 @@
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
<blockquote>
<div>
<p>
- You can tell Streams to apply topology optimizations by setting
this config. The optimizations are currently all or none and disabled by
default.
- These optimizations include moving/reducing repartition topics and
reusing the source topic as the changelog for source KTables. It is recommended
to enable this.
+ A configuration telling Kafka Streams if it should optimize the
topology and what optimizations to apply. Acceptable values are:
<code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>),
<code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated
list of specific optimizations:
(<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code>
(<code>reuse.ktable.source.topics</code>),
<code>StreamsConfig.MERGE_REPARTITION_TOPICS</code>
(<code>merge.repartition.topi [...]
+ </p>
+ <p>
+ We recommend listing specific optimizations in the config for
production code so that the structure of your topology will not change
unexpectedly during upgrades of the Streams library.
+ </p>
+ <p>
+ These optimizations include moving/reducing repartition topics and
reusing the source topic as the changelog for source KTables. These
optimizations will save on network traffic and storage in Kafka without
changing the semantics of your applications. Enabling them is recommended.
</p>
<p>
Note that as of 2.3, you need to do two things to enable
optimizations. In addition to setting this config to
<code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index e913728984e..39ae3821450 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -621,11 +621,7 @@ public class StreamsBuilder {
* @return the {@link Topology} that represents the specified processing
logic
*/
public synchronized Topology build(final Properties props) {
- final boolean optimizeTopology =
- props != null &&
-
StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
-
- internalStreamsBuilder.buildAndOptimizeTopology(optimizeTopology);
+ internalStreamsBuilder.buildAndOptimizeTopology(props);
return topology;
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 325cbec90d9..183f305a12c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams;
+import java.util.Arrays;
+import java.util.HashSet;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -233,6 +235,18 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String CLIENT_TAG_PREFIX = "client.tag.";
+ /** {@code topology.optimization} */
+ private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
+ + " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+ + "or a comma separated list of specific optimizations: "
+ + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\",
\"+MERGE_REPARTITION_TOPICS+\").";
+
+ public static final String TOPOLOGY_OPTIMIZATION_CONFIG =
"topology.optimization";
+ private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration
telling Kafka "
+ + "Streams if it should optimize the topology and what optimizations
to apply. "
+ + CONFIG_ERROR_MSG
+ + "\"NO_OPTIMIZATION\" by default.";
+
/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG
"topology.optimization"} for disabling topology optimization
*/
@@ -243,6 +257,22 @@ public class StreamsConfig extends AbstractConfig {
*/
public static final String OPTIMIZE = "all";
+ /**
+ * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG
"topology.optimization"}
+ * for enabling the specific optimization that reuses source topic as
changelog topic
+ * for KTables.
+ */
+ public static final String REUSE_KTABLE_SOURCE_TOPICS =
"reuse.ktable.source.topics";
+
+ /**
+ * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG
"topology.optimization"}
+ * for enabling the specific optimization that merges duplicated
repartition topics.
+ */
+ public static final String MERGE_REPARTITION_TOPICS =
"merge.repartition.topics";
+
+ private static final List<String> TOPOLOGY_OPTIMIZATION_CONFIGS =
Arrays.asList(
+ OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS,
MERGE_REPARTITION_TOPICS);
+
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 0.10.0.x}.
*/
@@ -663,9 +693,6 @@ public class StreamsConfig extends AbstractConfig {
"For a timeout of 0ms, a task would raise an error for the first
internal error. " +
"For any timeout larger than 0ms, a task will retry at least once
before an error is raised.";
- /** {@code topology.optimization} */
- public static final String TOPOLOGY_OPTIMIZATION_CONFIG =
"topology.optimization";
- private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration
telling Kafka Streams if it should optimize the topology, disabled by default";
/** {@code window.size.ms} */
public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
@@ -845,7 +872,7 @@ public class StreamsConfig extends AbstractConfig {
.define(TOPOLOGY_OPTIMIZATION_CONFIG,
Type.STRING,
NO_OPTIMIZATION,
- in(NO_OPTIMIZATION, OPTIMIZE),
+ (name, value) ->
verifyTopologyOptimizationConfigs((String) value),
Importance.MEDIUM,
TOPOLOGY_OPTIMIZATION_DOC)
@@ -1265,6 +1292,7 @@ public class StreamsConfig extends AbstractConfig {
if (eosEnabled) {
verifyEOSTransactionTimeoutCompatibility();
}
+
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
}
private void verifyEOSTransactionTimeoutCompatibility() {
@@ -1653,6 +1681,29 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
+ public static Set<String> verifyTopologyOptimizationConfigs(final String
config) {
+ final List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
+ final Set<String> verifiedConfigs = new HashSet<>();
+ // Verify it doesn't contain none or all plus a list of optimizations
+ if (configs.contains(NO_OPTIMIZATION) || configs.contains(OPTIMIZE)) {
+ if (configs.size() > 1) {
+ throw new ConfigException("\"" + config + "\" is not a valid
optimization config. " + CONFIG_ERROR_MSG);
+ }
+ }
+ for (final String conf: configs) {
+ if (!TOPOLOGY_OPTIMIZATION_CONFIGS.contains(conf)) {
+ throw new ConfigException("Unrecognized config. " +
CONFIG_ERROR_MSG);
+ }
+ }
+ if (configs.contains(OPTIMIZE)) {
+ verifiedConfigs.add(REUSE_KTABLE_SOURCE_TOPICS);
+ verifiedConfigs.add(MERGE_REPARTITION_TOPICS);
+ } else if (!configs.contains(NO_OPTIMIZATION)) {
+ verifiedConfigs.addAll(configs);
+ }
+ return verifiedConfigs;
+ }
+
/**
* Return an {@link Serde#configure(Map, boolean) configured} instance of
{@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde
* class}.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index ac9d281be4e..7f6753659a2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import java.util.Properties;
import java.util.TreeMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -270,17 +272,12 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
// use this method for testing only
public void buildAndOptimizeTopology() {
- buildAndOptimizeTopology(false);
+ buildAndOptimizeTopology(null);
}
- public void buildAndOptimizeTopology(final boolean optimizeTopology) {
-
+ public void buildAndOptimizeTopology(final Properties props) {
mergeDuplicateSourceNodes();
- if (optimizeTopology) {
- LOG.debug("Optimizing the Kafka Streams graph for repartition
nodes");
- optimizeKTableSourceTopics();
- maybeOptimizeRepartitionOperations();
- }
+ optimizeTopology(props);
final PriorityQueue<GraphNode> graphNodePriorityQueue = new
PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority));
@@ -305,6 +302,28 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
internalTopologyBuilder.validateCopartition();
}
+ /**
+ * A user can provide either the config OPTIMIZE which means all
optimizations rules will be
+ * applied or they can provide a list of optimization rules.
+ */
+ private void optimizeTopology(final Properties props) {
+ final Set<String> optimizationConfigs;
+ if (props == null ||
!props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+ optimizationConfigs = Collections.emptySet();
+ } else {
+ optimizationConfigs =
StreamsConfig.verifyTopologyOptimizationConfigs(
+ (String)
props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+ }
+ if
(optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+ LOG.debug("Optimizing the Kafka Streams graph for ktable source
nodes");
+ reuseKTableSourceTopics();
+ }
+ if
(optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+ LOG.debug("Optimizing the Kafka Streams graph for repartition
nodes");
+ mergeRepartitionTopics();
+ }
+ }
+
private void mergeDuplicateSourceNodes() {
final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new
HashMap<>();
@@ -356,12 +375,12 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
}
}
- private void optimizeKTableSourceTopics() {
+ private void reuseKTableSourceTopics() {
LOG.debug("Marking KTable source nodes to optimize using source topic
for changelogs ");
tableSourceNodes.forEach(node -> ((TableSourceNode<?, ?>)
node).reuseSourceTopicForChangeLog(true));
}
- private void maybeOptimizeRepartitionOperations() {
+ private void mergeRepartitionTopics() {
maybeUpdateKeyChangingRepartitionNodeMap();
final Iterator<Entry<GraphNode,
LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator =
keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 435dd249f2f..001fdd16b07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;
+import java.util.Set;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -1263,6 +1264,68 @@ public class StreamsConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+ final String value = String.join(",", StreamsConfig.OPTIMIZE,
StreamsConfig.NO_OPTIMIZATION);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("is not a valid
optimization config"));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenTopologyOptimizationOffAndSet() {
+ final String value = String.join(",", StreamsConfig.NO_OPTIMIZATION,
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("is not a valid
optimization config"));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenOptimizationDoesNotExistInList() {
+ final String value = String.join(",",
+
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+
"topology.optimization.does.not.exist",
+
StreamsConfig.MERGE_REPARTITION_TOPICS);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("Unrecognized config."));
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenTopologyOptimizationDoesNotExist() {
+ final String value = String.join(",",
"topology.optimization.does.not.exist");
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("Unrecognized config."));
+ }
+
+ @Test
+ public void shouldAllowMultipleOptimizations() {
+ final String value = String.join(",",
+
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+
StreamsConfig.MERGE_REPARTITION_TOPICS);
+ props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+ final StreamsConfig config = new StreamsConfig(props);
+ final List<String> configs =
Arrays.asList(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG).split(","));
+ assertEquals(2, configs.size());
+ assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
+ assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
+ }
+
+ @Test
+ public void shouldEnableAllOptimizationsWithOptimizeConfig() {
+ final Set<String> configs =
StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.OPTIMIZE);
+ assertEquals(2, configs.size());
+ assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
+ assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
+ }
+
+ @Test
+ public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() {
+ final Set<String> configs =
StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.NO_OPTIMIZATION);
+ assertEquals(0, configs.size());
+ }
+
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean
isKey) {