This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cda5da9b65f KAFKA-14209: Change Topology optimization to accept list 
of rules 1/3 (#12641)
cda5da9b65f is described below

commit cda5da9b65f78b51cdfe5371f712a0d392dbdb4d
Author: Vicky Papavasileiou <[email protected]>
AuthorDate: Thu Sep 22 17:20:37 2022 +0100

    KAFKA-14209: Change Topology optimization to accept list of rules 1/3 
(#12641)
    
    This PR is part of a series implementing the self-join rewriting. As part 
of it, we decided to clean up the TOPOLOGY_OPTIMIZATION_CONFIG and make it a 
list of optimization rules. Acceptable values are: NO_OPTIMIZATION, OPTIMIZE 
which applies all optimization rules or a comma separated list of specific 
optimizations.
    
    Reviewers: Guozhang Wang <[email protected]>, John Roesler 
<[email protected]>
---
 docs/streams/developer-guide/config-streams.html   | 13 +++--
 .../org/apache/kafka/streams/StreamsBuilder.java   |  6 +--
 .../org/apache/kafka/streams/StreamsConfig.java    | 59 ++++++++++++++++++--
 .../kstream/internals/InternalStreamsBuilder.java  | 39 ++++++++++----
 .../apache/kafka/streams/StreamsConfigTest.java    | 63 ++++++++++++++++++++++
 5 files changed, 157 insertions(+), 23 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index 0aee6b6e1dd..fcc0ea9993f 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -424,8 +424,8 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG), 1);</code></pre>
           </tr>
           <tr class="row-even"><td>topology.optimization</td>
             <td>Medium</td>
-            <td colspan="2">A configuration telling Kafka Streams if it should 
optimize the topology</td>
-            <td>none</td>
+            <td colspan="2">A configuration telling Kafka Streams if it should 
optimize the topology and what optimizations to apply. Acceptable values are: 
<code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), 
<code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated 
list of specific optimizations: 
(<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> 
(<code>reuse.ktable.source.topics</code>), 
<code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge. [...]
+            <td><code> NO_OPTIMIZATION</code></td>
           </tr>
           <tr class="row-odd"><td>upgrade.from</td>
             <td>Medium</td>
@@ -942,8 +942,13 @@ 
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
       <blockquote>
         <div>
           <p>
-            You can tell Streams to apply topology optimizations by setting 
this config. The optimizations are currently all or none and disabled by 
default.
-            These optimizations include moving/reducing repartition topics and 
reusing the source topic as the changelog for source KTables. It is recommended 
to enable this.
+            A configuration telling Kafka Streams if it should optimize the 
topology and what optimizations to apply. Acceptable values are: 
<code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), 
<code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated 
list of specific optimizations: 
(<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> 
(<code>reuse.ktable.source.topics</code>), 
<code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> 
(<code>merge.repartition.topi [...]
+          </p>
+          <p>
+            We recommend listing specific optimizations in the config for 
production code so that the structure of your topology will not change 
unexpectedly during upgrades of the Streams library.
+          </p>
+          <p>
+            These optimizations include moving/reducing repartition topics and 
reusing the source topic as the changelog for source KTables. These 
optimizations will save on network traffic and storage in Kafka without 
changing the semantics of your applications. Enabling them is recommended.
           </p>
           <p>
             Note that as of 2.3, you need to do two things to enable 
optimizations. In addition to setting this config to 
<code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index e913728984e..39ae3821450 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -621,11 +621,7 @@ public class StreamsBuilder {
      * @return the {@link Topology} that represents the specified processing 
logic
      */
     public synchronized Topology build(final Properties props) {
-        final boolean optimizeTopology =
-            props != null &&
-            
StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
-
-        internalStreamsBuilder.buildAndOptimizeTopology(optimizeTopology);
+        internalStreamsBuilder.buildAndOptimizeTopology(props);
         return topology;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 325cbec90d9..183f305a12c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -233,6 +235,18 @@ public class StreamsConfig extends AbstractConfig {
     @SuppressWarnings("WeakerAccess")
     public static final String CLIENT_TAG_PREFIX = "client.tag.";
 
+    /** {@code topology.optimization} */
+    private static final String CONFIG_ERROR_MSG = "Acceptable values are:"
+        + " \"+NO_OPTIMIZATION+\", \"+OPTIMIZE+\", "
+        + "or a comma separated list of specific optimizations: "
+        + "(\"+REUSE_KTABLE_SOURCE_TOPICS+\", 
\"+MERGE_REPARTITION_TOPICS+\").";
+
+    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
+    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka "
+        + "Streams if it should optimize the topology and what optimizations 
to apply. "
+        + CONFIG_ERROR_MSG
+        + "\"NO_OPTIMIZATION\" by default.";
+
     /**
      * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG 
"topology.optimization"} for disabling topology optimization
      */
@@ -243,6 +257,22 @@ public class StreamsConfig extends AbstractConfig {
      */
     public static final String OPTIMIZE = "all";
 
+    /**
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG 
"topology.optimization"}
+     * for enabling the specific optimization that reuses source topic as 
changelog topic
+     * for KTables.
+     */
+    public static final String REUSE_KTABLE_SOURCE_TOPICS = 
"reuse.ktable.source.topics";
+
+    /**
+     * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG 
"topology.optimization"}
+     * for enabling the specific optimization that merges duplicated 
repartition topics.
+     */
+    public static final String MERGE_REPARTITION_TOPICS = 
"merge.repartition.topics";
+
+    private static final List<String> TOPOLOGY_OPTIMIZATION_CONFIGS = 
Arrays.asList(
+        OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, 
MERGE_REPARTITION_TOPICS);
+
     /**
      * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} 
for upgrading an application from version {@code 0.10.0.x}.
      */
@@ -663,9 +693,6 @@ public class StreamsConfig extends AbstractConfig {
         "For a timeout of 0ms, a task would raise an error for the first 
internal error. " +
         "For any timeout larger than 0ms, a task will retry at least once 
before an error is raised.";
 
-    /** {@code topology.optimization} */
-    public static final String TOPOLOGY_OPTIMIZATION_CONFIG = 
"topology.optimization";
-    private static final String TOPOLOGY_OPTIMIZATION_DOC = "A configuration 
telling Kafka Streams if it should optimize the topology, disabled by default";
 
     /** {@code window.size.ms} */
     public static final String WINDOW_SIZE_MS_CONFIG = "window.size.ms";
@@ -845,7 +872,7 @@ public class StreamsConfig extends AbstractConfig {
             .define(TOPOLOGY_OPTIMIZATION_CONFIG,
                     Type.STRING,
                     NO_OPTIMIZATION,
-                    in(NO_OPTIMIZATION, OPTIMIZE),
+                    (name, value) -> 
verifyTopologyOptimizationConfigs((String) value),
                     Importance.MEDIUM,
                     TOPOLOGY_OPTIMIZATION_DOC)
 
@@ -1265,6 +1292,7 @@ public class StreamsConfig extends AbstractConfig {
         if (eosEnabled) {
             verifyEOSTransactionTimeoutCompatibility();
         }
+        
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
     }
 
     private void verifyEOSTransactionTimeoutCompatibility() {
@@ -1653,6 +1681,29 @@ public class StreamsConfig extends AbstractConfig {
         return props;
     }
 
+    public static Set<String> verifyTopologyOptimizationConfigs(final String 
config) {
+        final List<String> configs = Arrays.asList(config.split("\\s*,\\s*"));
+        final Set<String> verifiedConfigs = new HashSet<>();
+        // Verify it doesn't contain none or all plus a list of optimizations
+        if (configs.contains(NO_OPTIMIZATION) || configs.contains(OPTIMIZE)) {
+            if (configs.size() > 1) {
+                throw new ConfigException("\"" + config + "\" is not a valid 
optimization config. " + CONFIG_ERROR_MSG);
+            }
+        }
+        for (final String conf: configs) {
+            if (!TOPOLOGY_OPTIMIZATION_CONFIGS.contains(conf)) {
+                throw new ConfigException("Unrecognized config. " + 
CONFIG_ERROR_MSG);
+            }
+        }
+        if (configs.contains(OPTIMIZE)) {
+            verifiedConfigs.add(REUSE_KTABLE_SOURCE_TOPICS);
+            verifiedConfigs.add(MERGE_REPARTITION_TOPICS);
+        } else if (!configs.contains(NO_OPTIMIZATION)) {
+            verifiedConfigs.addAll(configs);
+        }
+        return verifiedConfigs;
+    }
+
     /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of 
{@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde
      * class}.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index ac9d281be4e..7f6753659a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Properties;
 import java.util.TreeMap;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -270,17 +272,12 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
 
     // use this method for testing only
     public void buildAndOptimizeTopology() {
-        buildAndOptimizeTopology(false);
+        buildAndOptimizeTopology(null);
     }
 
-    public void buildAndOptimizeTopology(final boolean optimizeTopology) {
-
+    public void buildAndOptimizeTopology(final Properties props) {
         mergeDuplicateSourceNodes();
-        if (optimizeTopology) {
-            LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
-            optimizeKTableSourceTopics();
-            maybeOptimizeRepartitionOperations();
-        }
+        optimizeTopology(props);
 
         final PriorityQueue<GraphNode> graphNodePriorityQueue = new 
PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority));
 
@@ -305,6 +302,28 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         internalTopologyBuilder.validateCopartition();
     }
 
+    /**
+     * A user can provide either the config OPTIMIZE which means all 
optimizations rules will be
+     * applied or they can provide a list of optimization rules.
+     */
+    private void optimizeTopology(final Properties props) {
+        final Set<String> optimizationConfigs;
+        if (props == null || 
!props.containsKey(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)) {
+            optimizationConfigs = Collections.emptySet();
+        } else {
+            optimizationConfigs = 
StreamsConfig.verifyTopologyOptimizationConfigs(
+                (String) 
props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
+        }
+        if 
(optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for ktable source 
nodes");
+            reuseKTableSourceTopics();
+        }
+        if 
(optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
+            LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+            mergeRepartitionTopics();
+        }
+    }
+
     private void mergeDuplicateSourceNodes() {
         final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new 
HashMap<>();
 
@@ -356,12 +375,12 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         }
     }
 
-    private void optimizeKTableSourceTopics() {
+    private void reuseKTableSourceTopics() {
         LOG.debug("Marking KTable source nodes to optimize using source topic 
for changelogs ");
         tableSourceNodes.forEach(node -> ((TableSourceNode<?, ?>) 
node).reuseSourceTopicForChangeLog(true));
     }
 
-    private void maybeOptimizeRepartitionOperations() {
+    private void mergeRepartitionTopics() {
         maybeUpdateKeyChangingRepartitionNodeMap();
         final Iterator<Entry<GraphNode, 
LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator =
             
keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 435dd249f2f..001fdd16b07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import java.util.Set;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -1263,6 +1264,68 @@ public class StreamsConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOnAndOff() {
+        final String value = String.join(",", StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("is not a valid 
optimization config"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationOffAndSet() {
+        final String value = String.join(",", StreamsConfig.NO_OPTIMIZATION, 
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("is not a valid 
optimization config"));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenOptimizationDoesNotExistInList() {
+        final String value = String.join(",",
+                                         
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+                                         
"topology.optimization.does.not.exist",
+                                         
StreamsConfig.MERGE_REPARTITION_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenTopologyOptimizationDoesNotExist() {
+        final String value = String.join(",", 
"topology.optimization.does.not.exist");
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Unrecognized config."));
+    }
+
+    @Test
+    public void shouldAllowMultipleOptimizations() {
+        final String value = String.join(",",
+                                         
StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS,
+                                         
StreamsConfig.MERGE_REPARTITION_TOPICS);
+        props.put(TOPOLOGY_OPTIMIZATION_CONFIG, value);
+        final StreamsConfig config = new StreamsConfig(props);
+        final List<String> configs = 
Arrays.asList(config.getString(TOPOLOGY_OPTIMIZATION_CONFIG).split(","));
+        assertEquals(2, configs.size());
+        assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
+        assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
+    }
+
+    @Test
+    public void shouldEnableAllOptimizationsWithOptimizeConfig() {
+        final Set<String> configs = 
StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.OPTIMIZE);
+        assertEquals(2, configs.size());
+        assertTrue(configs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS));
+        assertTrue(configs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS));
+    }
+
+    @Test
+    public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() {
+        final Set<String> configs = 
StreamsConfig.verifyTopologyOptimizationConfigs(StreamsConfig.NO_OPTIMIZATION);
+        assertEquals(0, configs.size());
+    }
+
     static class MisconfiguredSerde implements Serde<Object> {
         @Override
         public void configure(final Map<String, ?>  configs, final boolean 
isKey) {

Reply via email to