This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new ed6472bcf30 KAFKA-19668: processValue() must be declared as 
value-changing operation (#20470)
ed6472bcf30 is described below

commit ed6472bcf3096c550c509eb25aaeca1c4eb1fb32
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Fri Sep 5 18:00:24 2025 -0700

    KAFKA-19668: processValue() must be declared as value-changing operation 
(#20470)
    
    With "merge.repartition.topic" optimization enabled, Kafka Streams tries
    to push repartition topics upstream, to be able to merge multiple
    repartition topics from different downstream branches together.
    
    However, it is not safe to push a repartition topic if the parent node
    is value-changing: because of potentially changing data types, the
    topology might become invalid, and fail with serde error at runtime.
    
    The optimization itself work correctly, however, processValues() is not
    correctly declared as value-changing, what can lead to invalid
    topologies.
    
    Reviewers: Bill Bejeck <b...@confluent.io>, Lucas Brutschy
     <lbruts...@confluent.io>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |  11 +-
 .../org/apache/kafka/streams/TopologyConfig.java   |  23 +++
 .../kstream/internals/InternalStreamsBuilder.java  |   8 +-
 .../streams/kstream/internals/KStreamImpl.java     |  13 +-
 .../internals/InternalStreamsBuilderTest.java      |   6 +-
 .../internals/MaterializedInternalTest.java        |   6 +-
 .../kstream/internals/graph/StreamsGraphTest.java  | 182 +++++++++++++++++++--
 .../internals/RepartitionOptimizingTest.java       |  35 ++++
 .../processor/internals/StreamThreadTest.java      |   2 +-
 .../internals/StreamsPartitionAssignorTest.java    |   2 +-
 10 files changed, 265 insertions(+), 23 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index e56a4cbfb4e..2dbf48d67a4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -75,7 +75,7 @@ public class StreamsBuilder {
     public StreamsBuilder() {
         topology = new Topology();
         internalTopologyBuilder = topology.internalTopologyBuilder;
-        internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
+        internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder, false);
     }
 
     /**
@@ -87,7 +87,14 @@ public class StreamsBuilder {
     public StreamsBuilder(final TopologyConfig topologyConfigs) {
         topology = newTopology(topologyConfigs);
         internalTopologyBuilder = topology.internalTopologyBuilder;
-        internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
+        internalStreamsBuilder = new InternalStreamsBuilder(
+            internalTopologyBuilder,
+            TopologyConfig.InternalConfig.getBoolean(
+                topologyConfigs.originals(),
+                TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX,
+                false
+            )
+        );
     }
 
     protected Topology newTopology(final TopologyConfig topologyConfigs) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index da8c246b26d..fd76f07686a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.DslStoreSuppliers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Supplier;
@@ -86,6 +87,28 @@ import static 
org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSi
  */
 @SuppressWarnings("deprecation")
 public final class TopologyConfig extends AbstractConfig {
+
+    public static class InternalConfig {
+        // Cf https://issues.apache.org/jira/browse/KAFKA-19668
+        public static final String ENABLE_PROCESS_PROCESSVALUE_FIX = 
"__enable.process.processValue.fix__";
+
+        public static boolean getBoolean(final Map<String, Object> configs, 
final String key, final boolean defaultValue) {
+            final Object value = configs.getOrDefault(key, defaultValue);
+            if (value instanceof Boolean) {
+                return (boolean) value;
+            } else if (value instanceof String) {
+                return Boolean.parseBoolean((String) value);
+            } else {
+                log.warn(
+                    "Invalid value ({}) on internal configuration '{}'. Please 
specify a true/false value.",
+                    value,
+                    key
+                );
+                return defaultValue;
+            }
+        }
+    }
+
     private static final ConfigDef CONFIG;
     static {
         CONFIG = new ConfigDef()
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 968276bd501..6460313a62b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
     private static final String TABLE_SOURCE_SUFFIX = "-source";
 
     final InternalTopologyBuilder internalTopologyBuilder;
+    private final boolean processProcessValueFixEnabled;
     private final AtomicInteger index = new AtomicInteger(0);
 
     private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
@@ -91,8 +92,10 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         }
     };
 
-    public InternalStreamsBuilder(final InternalTopologyBuilder 
internalTopologyBuilder) {
+    public InternalStreamsBuilder(final InternalTopologyBuilder 
internalTopologyBuilder,
+                                  final boolean processProcessValueFixEnabled) 
{
         this.internalTopologyBuilder = internalTopologyBuilder;
+        this.processProcessValueFixEnabled = processProcessValueFixEnabled;
     }
 
     public <K, V> KStream<K, V> stream(final Collection<String> topics,
@@ -709,4 +712,7 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         return internalTopologyBuilder;
     }
 
+    public boolean processProcessValueFixEnabled() {
+        return processProcessValueFixEnabled;
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 1927aed03fa..02342a9de4a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1306,7 +1306,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         final ProcessorToStateConnectorNode<? super K, ? super V> processNode 
= new ProcessorToStateConnectorNode<>(
             name,
             new ProcessorParameters<>(processorSupplier, name),
-            stateStoreNames);
+            stateStoreNames
+        );
+        if (builder.processProcessValueFixEnabled()) {
+            processNode.setKeyChangingOperation(true);
+            processNode.setValueChangingOperation(true);
+        }
 
         builder.addGraphNode(graphNode, processNode);
 
@@ -1350,7 +1355,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         final ProcessorToStateConnectorNode<? super K, ? super V> processNode 
= new ProcessorToStateConnectorNode<>(
             name,
             new ProcessorParameters<>(processorSupplier, name),
-            stateStoreNames);
+            stateStoreNames
+        );
+        if (builder.processProcessValueFixEnabled()) {
+            processNode.setValueChangingOperation(true);
+        }
 
         builder.addGraphNode(graphNode, processNode);
         // cannot inherit value serde
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 9ac3ff776ca..0a13a4100e4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest {
 
     private static final String APP_ID = "app-id";
 
-    private final InternalStreamsBuilder builder = new 
InternalStreamsBuilder(new InternalTopologyBuilder());
+    private final InternalStreamsBuilder builder = new 
InternalStreamsBuilder(new InternalTopologyBuilder(), false);
     private final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>(Consumed.with(null, null));
     private final String storePrefix = "prefix-";
     private final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new 
MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix);
@@ -93,7 +93,7 @@ public class InternalStreamsBuilderTest {
         assertEquals("Y-0000000001", builder.newProcessorName("Y-"));
         assertEquals("Z-0000000002", builder.newProcessorName("Z-"));
 
-        final InternalStreamsBuilder newBuilder = new 
InternalStreamsBuilder(new InternalTopologyBuilder());
+        final InternalStreamsBuilder newBuilder = new 
InternalStreamsBuilder(new InternalTopologyBuilder(), false);
 
         assertEquals("X-0000000000", newBuilder.newProcessorName("X-"));
         assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-"));
@@ -106,7 +106,7 @@ public class InternalStreamsBuilderTest {
         assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-"));
         assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-"));
 
-        final InternalStreamsBuilder newBuilder = new 
InternalStreamsBuilder(new InternalTopologyBuilder());
+        final InternalStreamsBuilder newBuilder = new 
InternalStreamsBuilder(new InternalTopologyBuilder(), false);
 
         assertEquals("X-STATE-STORE-0000000000", 
newBuilder.newStoreName("X-"));
         assertEquals("Y-STATE-STORE-0000000001", 
newBuilder.newStoreName("Y-"));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 24600c57fec..a3eaadd27db 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -95,7 +95,7 @@ public class MaterializedInternalTest {
         final InternalTopologyBuilder topologyBuilder = new 
InternalTopologyBuilder(
             new TopologyConfig("my-topology", config, topologyOverrides));
 
-        final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(topologyBuilder);
+        final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(topologyBuilder, false);
 
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized =
             new MaterializedInternal<>(Materialized.as(supplier), 
internalStreamsBuilder, prefix);
@@ -113,7 +113,7 @@ public class MaterializedInternalTest {
         final InternalTopologyBuilder topologyBuilder = new 
InternalTopologyBuilder(
                 new TopologyConfig("my-topology", config, topologyOverrides));
 
-        final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(topologyBuilder);
+        final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(topologyBuilder, false);
 
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized =
                 new MaterializedInternal<>(Materialized.as(supplier), 
internalStreamsBuilder, prefix);
@@ -129,7 +129,7 @@ public class MaterializedInternalTest {
         final InternalTopologyBuilder topologyBuilder = new 
InternalTopologyBuilder(
                 new TopologyConfig("my-topology", config, topologyOverrides));
 
-        final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(topologyBuilder);
+        final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(topologyBuilder, false);
 
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized =
                 new MaterializedInternal<>(Materialized.as(supplier), 
internalStreamsBuilder, prefix);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index e046ba19533..1675619480f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Branched;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -36,6 +37,8 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
@@ -127,7 +130,7 @@ public class StreamsGraphTest {
         initializer = () -> "";
         aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
         final ProcessorSupplier<String, String, String, String> 
processorSupplier =
-            () -> new Processor<String, String, String, String>() {
+            () -> new Processor<>() {
                 private ProcessorContext<String, String> context;
 
                 @Override
@@ -185,14 +188,163 @@ public class StreamsGraphTest {
     }
 
     @Test
-    public void 
shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
+    public void 
shouldPartiallyOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixDisabled()
 {
+        final Topology attemptedOptimize = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, false);
+        final Topology noOptimization = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, 
false);
+
+        System.out.println(attemptedOptimize.describe().toString());
+        System.out.println(noOptimization.describe().toString());
+        assertEquals("Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+                "      --> KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" +
+                "      --> KSTREAM-FLATMAPVALUES-0000000010, 
KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: 
[])\n" +
+                "      --> KSTREAM-FILTER-0000000014\n" +
+                "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" +
+                "      --> KSTREAM-FILTER-0000000006\n" +
+                "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: 
[])\n" +
+                "      --> KSTREAM-FILTER-0000000022\n" +
+                "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000005\n" +
+                "      <-- KSTREAM-MAPVALUES-0000000002\n" +
+                "    Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000013\n" +
+                "      <-- KSTREAM-FLATMAPVALUES-0000000010\n" +
+                "    Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000021\n" +
+                "      <-- KSTREAM-PROCESSVALUES-0000000018\n" +
+                "    Sink: KSTREAM-SINK-0000000005 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" +
+                "      <-- KSTREAM-FILTER-0000000006\n" +
+                "    Sink: KSTREAM-SINK-0000000013 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" +
+                "      <-- KSTREAM-FILTER-0000000014\n" +
+                "    Sink: KSTREAM-SINK-0000000021 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" +
+                "      <-- KSTREAM-FILTER-0000000022\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000007 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000004\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000004 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" +
+                "      --> KTABLE-TOSTREAM-0000000008\n" +
+                "      <-- KSTREAM-SOURCE-0000000007\n" +
+                "    Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000009\n" +
+                "      <-- KSTREAM-AGGREGATE-0000000004\n" +
+                "    Sink: KSTREAM-SINK-0000000009 (topic: output)\n" +
+                "      <-- KTABLE-TOSTREAM-0000000008\n" +
+                "\n" +
+                "  Sub-topology: 2\n" +
+                "    Source: KSTREAM-SOURCE-0000000015 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000012\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000012 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" +
+                "      --> KTABLE-TOSTREAM-0000000016\n" +
+                "      <-- KSTREAM-SOURCE-0000000015\n" +
+                "    Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000017\n" +
+                "      <-- KSTREAM-AGGREGATE-0000000012\n" +
+                "    Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" 
+
+                "      <-- KTABLE-TOSTREAM-0000000016\n" +
+                "\n" +
+                "  Sub-topology: 3\n" +
+                "    Source: KSTREAM-SOURCE-0000000023 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000020\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000020 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" +
+                "      --> KTABLE-TOSTREAM-0000000024\n" +
+                "      <-- KSTREAM-SOURCE-0000000023\n" +
+                "    Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000025\n" +
+                "      <-- KSTREAM-AGGREGATE-0000000020\n" +
+                "    Sink: KSTREAM-SINK-0000000025 (topic: output)\n" +
+                "      <-- KTABLE-TOSTREAM-0000000024\n" +
+                "\n",
+            noOptimization.describe().toString()
+        );
+        assertEquals("Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" +
+                "      --> KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" +
+                "      --> KSTREAM-FLATMAPVALUES-0000000010, 
KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n" +
+                "    Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: 
[])\n" +
+                "      --> KSTREAM-FILTER-0000000014\n" +
+                "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" +
+                "      --> KSTREAM-FILTER-0000000006\n" +
+                "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: 
[])\n" +
+                "      --> KSTREAM-FILTER-0000000022\n" +
+                "      <-- KSTREAM-KEY-SELECT-0000000001\n" +
+                "    Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000005\n" +
+                "      <-- KSTREAM-MAPVALUES-0000000002\n" +
+                "    Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000013\n" +
+                "      <-- KSTREAM-FLATMAPVALUES-0000000010\n" +
+                "    Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000021\n" +
+                "      <-- KSTREAM-PROCESSVALUES-0000000018\n" +
+                "    Sink: KSTREAM-SINK-0000000005 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" +
+                "      <-- KSTREAM-FILTER-0000000006\n" +
+                "    Sink: KSTREAM-SINK-0000000013 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" +
+                "      <-- KSTREAM-FILTER-0000000014\n" +
+                "    Sink: KSTREAM-SINK-0000000021 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" +
+                "      <-- KSTREAM-FILTER-0000000022\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000007 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000004\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000004 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" +
+                "      --> KTABLE-TOSTREAM-0000000008\n" +
+                "      <-- KSTREAM-SOURCE-0000000007\n" +
+                "    Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000009\n" +
+                "      <-- KSTREAM-AGGREGATE-0000000004\n" +
+                "    Sink: KSTREAM-SINK-0000000009 (topic: output)\n" +
+                "      <-- KTABLE-TOSTREAM-0000000008\n" +
+                "\n" +
+                "  Sub-topology: 2\n" +
+                "    Source: KSTREAM-SOURCE-0000000015 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000012\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000012 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" +
+                "      --> KTABLE-TOSTREAM-0000000016\n" +
+                "      <-- KSTREAM-SOURCE-0000000015\n" +
+                "    Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000017\n" +
+                "      <-- KSTREAM-AGGREGATE-0000000012\n" +
+                "    Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" 
+
+                "      <-- KTABLE-TOSTREAM-0000000016\n" +
+                "\n" +
+                "  Sub-topology: 3\n" +
+                "    Source: KSTREAM-SOURCE-0000000023 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000020\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000020 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" +
+                "      --> KTABLE-TOSTREAM-0000000024\n" +
+                "      <-- KSTREAM-SOURCE-0000000023\n" +
+                "    Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000025\n" +
+                "      <-- KSTREAM-AGGREGATE-0000000020\n" +
+                "    Sink: KSTREAM-SINK-0000000025 (topic: output)\n" +
+                "      <-- KTABLE-TOSTREAM-0000000024\n\n",
+            noOptimization.describe().toString()
+        );
+        assertEquals(3, 
getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+        assertEquals(3, 
getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
+    }
 
-        final Topology attemptedOptimize = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
-        final Topology noOptimization = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION);
+    @Test
+    public void 
shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled()
 {
+        final Topology attemptedOptimize = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, true);
+        final Topology noOptimization = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, 
true);
 
         assertEquals(attemptedOptimize.describe().toString(), 
noOptimization.describe().toString());
-        assertEquals(2, 
getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
-        assertEquals(2, 
getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
+        assertEquals(3, 
getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+        assertEquals(3, 
getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
     }
 
     @Test
@@ -227,20 +379,30 @@ public class StreamsGraphTest {
         assertEquals(2, 
getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
     }
 
-    private Topology getTopologyWithChangingValuesAfterChangingKey(final 
String optimizeConfig) {
-
-        final StreamsBuilder builder = new StreamsBuilder();
+    private Topology getTopologyWithChangingValuesAfterChangingKey(final 
String optimizeConfig,
+                                                                   final 
boolean enableFix) {
         final Properties properties = new Properties();
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
         properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
optimizeConfig);
+        
properties.put(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, 
enableFix);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(properties)));
 
         final KStream<String, String> inputStream = builder.stream("input");
         final KStream<String, String> mappedKeyStream = 
inputStream.selectKey((k, v) -> k + v);
 
         mappedKeyStream.mapValues(v -> 
v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
         mappedKeyStream.flatMapValues(v -> 
Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
+        mappedKeyStream.processValues(
+            () -> new ContextualFixedKeyProcessor<>() {
+                @Override
+                public void process(final FixedKeyRecord<String, String> 
record) {
+                    
context().forward(record.withValue(record.value().toUpperCase(Locale.getDefault())));
+                }
+            }).groupByKey().count().toStream().to("output");
 
         return builder.build(properties);
-
     }
 
     private Topology getTopologyWithRepartitionOperation(final String 
optimizeConfig) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
index afb89fa3c8f..57753df91e8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -42,6 +43,8 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.Stores;
@@ -220,6 +223,38 @@ public class RepartitionOptimizingTest {
         assertThat(joinedOutputTopic.readKeyValuesToMap(), 
equalTo(keyValueListToMap(expectedJoinKeyValues)));
     }
 
+    @Test
+    public void shouldNotPushRepartitionAcrossValueChangingOperation() {
+        
streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        
streamsConfiguration.setProperty(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX,
 "true");
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(streamsConfiguration)));
+
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()).withName("sourceStream"))
+            .map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), 
v))
+            .processValues(() -> new ContextualFixedKeyProcessor<String, 
String, Integer>() {
+                @Override
+                public void process(final FixedKeyRecord<String, String> 
record) {
+                    
context().forward(record.withValue(record.value().length()));
+                }
+            })
+            .groupByKey(Grouped.valueSerde(new Serdes.IntegerSerde()))
+            .reduce(Integer::sum)
+            .toStream()
+            .to(AGGREGATION_TOPIC);
+
+        final Topology topology = builder.build(streamsConfiguration);
+
+        topologyTestDriver = new TopologyTestDriver(topology, 
streamsConfiguration);
+
+        final TestInputTopic<String, String> inputTopic = 
topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, 
stringSerializer);
+        final TestOutputTopic<String, Integer> outputTopic = 
topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new 
IntegerDeserializer());
+
+        inputTopic.pipeKeyValueList(getKeyValues());
+
+        assertThat(outputTopic.readKeyValuesToMap(), 
equalTo(keyValueListToMap(expectedAggKeyValues)));
+    }
+
     private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> 
keyValuePairs) {
         final Map<K, V> map = new HashMap<>();
         for (final KeyValue<K, V> pair : keyValuePairs) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index aa68672b3c0..740d7d2c8f9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -203,7 +203,7 @@ public class StreamThreadTest {
     private final ChangelogReader changelogReader = new MockChangelogReader();
     private StateDirectory stateDirectory = null;
     private final InternalTopologyBuilder internalTopologyBuilder = new 
InternalTopologyBuilder();
-    private final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder);
+    private final InternalStreamsBuilder internalStreamsBuilder = new 
InternalStreamsBuilder(internalTopologyBuilder, false);
 
     private StreamThread thread = null;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index b34d1408c56..52253f78b23 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -2622,7 +2622,7 @@ public class StreamsPartitionAssignorTest {
         builder = new CorruptedInternalTopologyBuilder();
         topologyMetadata = new TopologyMetadata(builder, new 
StreamsConfig(configProps(parameterizedConfig)));
 
-        final InternalStreamsBuilder streamsBuilder = new 
InternalStreamsBuilder(builder);
+        final InternalStreamsBuilder streamsBuilder = new 
InternalStreamsBuilder(builder, false);
 
         final KStream<String, String> inputTopic = 
streamsBuilder.stream(singleton("topic1"), new 
ConsumedInternal<>(Consumed.with(null, null)));
         final KTable<String, String> inputTable = 
streamsBuilder.table("topic2", new ConsumedInternal<>(Consumed.with(null, 
null)), new MaterializedInternal<>(Materialized.as("store")));

Reply via email to