This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new d3b3aa5c352 KAFKA-19668: processValue() must be declared as value-changing operation (#20470) d3b3aa5c352 is described below commit d3b3aa5c3520b7ab85c1983d8b3147f0d064a5e5 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Fri Sep 5 18:00:24 2025 -0700 KAFKA-19668: processValue() must be declared as value-changing operation (#20470) With "merge.repartition.topic" optimization enabled, Kafka Streams tries to push repartition topics upstream, to be able to merge multiple repartition topics from different downstream branches together. However, it is not safe to push a repartition topic if the parent node is value-changing: because of potentially changing data types, the topology might become invalid, and fail with serde error at runtime. The optimization itself work correctly, however, processValues() is not correctly declared as value-changing, what can lead to invalid topologies. Reviewers: Bill Bejeck <b...@confluent.io>, Lucas Brutschy <lbruts...@confluent.io> --- .../org/apache/kafka/streams/StreamsBuilder.java | 11 +- .../org/apache/kafka/streams/TopologyConfig.java | 23 +++ .../kstream/internals/InternalStreamsBuilder.java | 8 +- .../streams/kstream/internals/KStreamImpl.java | 13 +- .../internals/InternalStreamsBuilderTest.java | 6 +- .../internals/MaterializedInternalTest.java | 6 +- .../kstream/internals/graph/StreamsGraphTest.java | 182 +++++++++++++++++++-- .../internals/RepartitionOptimizingTest.java | 35 ++++ .../processor/internals/StreamThreadTest.java | 2 +- .../internals/StreamsPartitionAssignorTest.java | 2 +- 10 files changed, 265 insertions(+), 23 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 7037e8d7fd3..02b199c0c74 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -75,7 +75,7 @@ public class StreamsBuilder { public StreamsBuilder() { topology = new Topology(); internalTopologyBuilder = topology.internalTopologyBuilder; - internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false); } /** @@ -87,7 +87,14 @@ public class StreamsBuilder { public StreamsBuilder(final TopologyConfig topologyConfigs) { topology = newTopology(topologyConfigs); internalTopologyBuilder = topology.internalTopologyBuilder; - internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + internalStreamsBuilder = new InternalStreamsBuilder( + internalTopologyBuilder, + TopologyConfig.InternalConfig.getBoolean( + topologyConfigs.originals(), + TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, + false + ) + ); } protected Topology newTopology(final TopologyConfig topologyConfigs) { diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java index e96d5281d09..be43da321bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.state.DslStoreSuppliers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.function.Supplier; @@ -84,6 +85,28 @@ import static org.apache.kafka.streams.internals.StreamsConfigUtils.totalCacheSi */ @SuppressWarnings("deprecation") public final class TopologyConfig extends AbstractConfig { + + public static class InternalConfig { + // Cf https://issues.apache.org/jira/browse/KAFKA-19668 + public static final String ENABLE_PROCESS_PROCESSVALUE_FIX = "__enable.process.processValue.fix__"; + + public static boolean getBoolean(final Map<String, Object> configs, final String key, final boolean defaultValue) { + final Object value = configs.getOrDefault(key, defaultValue); + if (value instanceof Boolean) { + return (boolean) value; + } else if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } else { + log.warn( + "Invalid value ({}) on internal configuration '{}'. Please specify a true/false value.", + value, + key + ); + return defaultValue; + } + } + } + private static final ConfigDef CONFIG; static { CONFIG = new ConfigDef() diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 954b88bfbea..e0d9ce12a23 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -72,6 +72,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { private static final String TABLE_SOURCE_SUFFIX = "-source"; final InternalTopologyBuilder internalTopologyBuilder; + private final boolean processProcessValueFixEnabled; private final AtomicInteger index = new AtomicInteger(0); private final AtomicInteger buildPriorityIndex = new AtomicInteger(0); @@ -91,8 +92,10 @@ public class InternalStreamsBuilder implements InternalNameProvider { } }; - public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder) { + public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuilder, + final boolean processProcessValueFixEnabled) { this.internalTopologyBuilder = internalTopologyBuilder; + this.processProcessValueFixEnabled = processProcessValueFixEnabled; } public <K, V> KStream<K, V> stream(final Collection<String> topics, @@ -706,4 +709,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { return internalTopologyBuilder; } + public boolean processProcessValueFixEnabled() { + return processProcessValueFixEnabled; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7beaa1abffb..a5b20d2d20a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1235,7 +1235,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), - stateStoreNames); + stateStoreNames + ); + if (builder.processProcessValueFixEnabled()) { + processNode.keyChangingOperation(true); + processNode.setValueChangingOperation(true); + } builder.addGraphNode(graphNode, processNode); @@ -1280,7 +1285,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), - stateStoreNames); + stateStoreNames + ); + if (builder.processProcessValueFixEnabled()) { + processNode.setValueChangingOperation(true); + } builder.addGraphNode(graphNode, processNode); // cannot inherit value serde diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index c761d1eda7c..8db9441d89f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -81,7 +81,7 @@ public class InternalStreamsBuilderTest { private static final String APP_ID = "app-id"; - private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder()); + private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false); private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.with(null, null)); private final String storePrefix = "prefix-"; private final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as("test-store"), builder, storePrefix); @@ -93,7 +93,7 @@ public class InternalStreamsBuilderTest { assertEquals("Y-0000000001", builder.newProcessorName("Y-")); assertEquals("Z-0000000002", builder.newProcessorName("Z-")); - final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder()); + final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false); assertEquals("X-0000000000", newBuilder.newProcessorName("X-")); assertEquals("Y-0000000001", newBuilder.newProcessorName("Y-")); @@ -106,7 +106,7 @@ public class InternalStreamsBuilderTest { assertEquals("Y-STATE-STORE-0000000001", builder.newStoreName("Y-")); assertEquals("Z-STATE-STORE-0000000002", builder.newStoreName("Z-")); - final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder()); + final InternalStreamsBuilder newBuilder = new InternalStreamsBuilder(new InternalTopologyBuilder(), false); assertEquals("X-STATE-STORE-0000000000", newBuilder.newStoreName("X-")); assertEquals("Y-STATE-STORE-0000000001", newBuilder.newStoreName("Y-")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java index 24600c57fec..a3eaadd27db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java @@ -95,7 +95,7 @@ public class MaterializedInternalTest { final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( new TopologyConfig("my-topology", config, topologyOverrides)); - final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false); final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); @@ -113,7 +113,7 @@ public class MaterializedInternalTest { final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( new TopologyConfig("my-topology", config, topologyOverrides)); - final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false); final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); @@ -129,7 +129,7 @@ public class MaterializedInternalTest { final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( new TopologyConfig("my-topology", config, topologyOverrides)); - final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder); + final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(topologyBuilder, false); final MaterializedInternal<Object, Object, KeyValueStore<Bytes, byte[]>> materialized = new MaterializedInternal<>(Materialized.as(supplier), internalStreamsBuilder, prefix); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index e046ba19533..1675619480f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Branched; import org.apache.kafka.streams.kstream.Consumed; @@ -36,6 +37,8 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; @@ -127,7 +130,7 @@ public class StreamsGraphTest { initializer = () -> ""; aggregator = (aggKey, value, aggregate) -> aggregate + value.length(); final ProcessorSupplier<String, String, String, String> processorSupplier = - () -> new Processor<String, String, String, String>() { + () -> new Processor<>() { private ProcessorContext<String, String> context; @Override @@ -185,14 +188,163 @@ public class StreamsGraphTest { } @Test - public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() { + public void shouldPartiallyOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixDisabled() { + final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, false); + final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, false); + + System.out.println(attemptedOptimize.describe().toString()); + System.out.println(noOptimization.describe().toString()); + assertEquals("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" + + " --> KSTREAM-FLATMAPVALUES-0000000010, KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000014\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000006\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000022\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KSTREAM-MAPVALUES-0000000002\n" + + " Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" + + " --> KSTREAM-SINK-0000000013\n" + + " <-- KSTREAM-FLATMAPVALUES-0000000010\n" + + " Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" + + " --> KSTREAM-SINK-0000000021\n" + + " <-- KSTREAM-PROCESSVALUES-0000000018\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" + + " <-- KSTREAM-FILTER-0000000006\n" + + " Sink: KSTREAM-SINK-0000000013 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" + + " <-- KSTREAM-FILTER-0000000014\n" + + " Sink: KSTREAM-SINK-0000000021 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" + + " <-- KSTREAM-FILTER-0000000022\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000004\n" + + " Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" + + " --> KTABLE-TOSTREAM-0000000008\n" + + " <-- KSTREAM-SOURCE-0000000007\n" + + " Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" + + " --> KSTREAM-SINK-0000000009\n" + + " <-- KSTREAM-AGGREGATE-0000000004\n" + + " Sink: KSTREAM-SINK-0000000009 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000008\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000015 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000012\n" + + " Processor: KSTREAM-AGGREGATE-0000000012 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" + + " --> KTABLE-TOSTREAM-0000000016\n" + + " <-- KSTREAM-SOURCE-0000000015\n" + + " Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" + + " --> KSTREAM-SINK-0000000017\n" + + " <-- KSTREAM-AGGREGATE-0000000012\n" + + " Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" + + " <-- KTABLE-TOSTREAM-0000000016\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000023 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000020\n" + + " Processor: KSTREAM-AGGREGATE-0000000020 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" + + " --> KTABLE-TOSTREAM-0000000024\n" + + " <-- KSTREAM-SOURCE-0000000023\n" + + " Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" + + " --> KSTREAM-SINK-0000000025\n" + + " <-- KSTREAM-AGGREGATE-0000000020\n" + + " Sink: KSTREAM-SINK-0000000025 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000024\n" + + "\n", + noOptimization.describe().toString() + ); + assertEquals("Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n" + + " --> KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])\n" + + " --> KSTREAM-FLATMAPVALUES-0000000010, KSTREAM-MAPVALUES-0000000002, KSTREAM-PROCESSVALUES-0000000018\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FLATMAPVALUES-0000000010 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000014\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000006\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-PROCESSVALUES-0000000018 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000022\n" + + " <-- KSTREAM-KEY-SELECT-0000000001\n" + + " Processor: KSTREAM-FILTER-0000000006 (stores: [])\n" + + " --> KSTREAM-SINK-0000000005\n" + + " <-- KSTREAM-MAPVALUES-0000000002\n" + + " Processor: KSTREAM-FILTER-0000000014 (stores: [])\n" + + " --> KSTREAM-SINK-0000000013\n" + + " <-- KSTREAM-FLATMAPVALUES-0000000010\n" + + " Processor: KSTREAM-FILTER-0000000022 (stores: [])\n" + + " --> KSTREAM-SINK-0000000021\n" + + " <-- KSTREAM-PROCESSVALUES-0000000018\n" + + " Sink: KSTREAM-SINK-0000000005 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n" + + " <-- KSTREAM-FILTER-0000000006\n" + + " Sink: KSTREAM-SINK-0000000013 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n" + + " <-- KSTREAM-FILTER-0000000014\n" + + " Sink: KSTREAM-SINK-0000000021 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition)\n" + + " <-- KSTREAM-FILTER-0000000022\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000004\n" + + " Processor: KSTREAM-AGGREGATE-0000000004 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000003])\n" + + " --> KTABLE-TOSTREAM-0000000008\n" + + " <-- KSTREAM-SOURCE-0000000007\n" + + " Processor: KTABLE-TOSTREAM-0000000008 (stores: [])\n" + + " --> KSTREAM-SINK-0000000009\n" + + " <-- KSTREAM-AGGREGATE-0000000004\n" + + " Sink: KSTREAM-SINK-0000000009 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000008\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000015 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000012\n" + + " Processor: KSTREAM-AGGREGATE-0000000012 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000011])\n" + + " --> KTABLE-TOSTREAM-0000000016\n" + + " <-- KSTREAM-SOURCE-0000000015\n" + + " Processor: KTABLE-TOSTREAM-0000000016 (stores: [])\n" + + " --> KSTREAM-SINK-0000000017\n" + + " <-- KSTREAM-AGGREGATE-0000000012\n" + + " Sink: KSTREAM-SINK-0000000017 (topic: windowed-output)\n" + + " <-- KTABLE-TOSTREAM-0000000016\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000023 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000019-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000020\n" + + " Processor: KSTREAM-AGGREGATE-0000000020 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000019])\n" + + " --> KTABLE-TOSTREAM-0000000024\n" + + " <-- KSTREAM-SOURCE-0000000023\n" + + " Processor: KTABLE-TOSTREAM-0000000024 (stores: [])\n" + + " --> KSTREAM-SINK-0000000025\n" + + " <-- KSTREAM-AGGREGATE-0000000020\n" + + " Sink: KSTREAM-SINK-0000000025 (topic: output)\n" + + " <-- KTABLE-TOSTREAM-0000000024\n\n", + noOptimization.describe().toString() + ); + assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); + } - final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE); - final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION); + @Test + public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChangeWithFixEnabled() { + final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE, true); + final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION, true); assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString()); - assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); - assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); + assertEquals(3, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString())); + assertEquals(3, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } @Test @@ -227,20 +379,30 @@ public class StreamsGraphTest { assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString())); } - private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) { - - final StreamsBuilder builder = new StreamsBuilder(); + private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig, + final boolean enableFix) { final Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, optimizeConfig); + properties.put(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, enableFix); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties))); final KStream<String, String> inputStream = builder.stream("input"); final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v); mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output"); mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output"); + mappedKeyStream.processValues( + () -> new ContextualFixedKeyProcessor<>() { + @Override + public void process(final FixedKeyRecord<String, String> record) { + context().forward(record.withValue(record.value().toUpperCase(Locale.getDefault()))); + } + }).groupByKey().count().toStream().to("output"); return builder.build(properties); - } private Topology getTopologyWithRepartitionOperation(final String optimizeConfig) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java index 91b95b87a7f..0d453b05271 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyConfig; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; @@ -42,6 +43,8 @@ import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.Stores; @@ -224,6 +227,38 @@ public class RepartitionOptimizingTest { assertThat(joinedOutputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedJoinKeyValues))); } + @Test + public void shouldNotPushRepartitionAcrossValueChangingOperation() { + streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + streamsConfiguration.setProperty(TopologyConfig.InternalConfig.ENABLE_PROCESS_PROCESSVALUE_FIX, "true"); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(streamsConfiguration))); + + builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream")) + .map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v)) + .processValues(() -> new ContextualFixedKeyProcessor<String, String, Integer>() { + @Override + public void process(final FixedKeyRecord<String, String> record) { + context().forward(record.withValue(record.value().length())); + } + }) + .groupByKey(Grouped.valueSerde(new Serdes.IntegerSerde())) + .reduce(Integer::sum) + .toStream() + .to(AGGREGATION_TOPIC); + + final Topology topology = builder.build(streamsConfiguration); + + topologyTestDriver = new TopologyTestDriver(topology, streamsConfiguration); + + final TestInputTopic<String, String> inputTopic = topologyTestDriver.createInputTopic(INPUT_TOPIC, stringSerializer, stringSerializer); + final TestOutputTopic<String, Integer> outputTopic = topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, stringDeserializer, new IntegerDeserializer()); + + inputTopic.pipeKeyValueList(getKeyValues()); + + assertThat(outputTopic.readKeyValuesToMap(), equalTo(keyValueListToMap(expectedAggKeyValues))); + } + private <K, V> Map<K, V> keyValueListToMap(final List<KeyValue<K, V>> keyValuePairs) { final Map<K, V> map = new HashMap<>(); for (final KeyValue<K, V> pair : keyValuePairs) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d3a05720cd7..e5b3c0b1194 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -192,7 +192,7 @@ public class StreamThreadTest { private final ChangelogReader changelogReader = new MockChangelogReader(); private StateDirectory stateDirectory = null; private final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder(); - private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + private final InternalStreamsBuilder internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder, false); private StreamThread thread = null; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 29fa204a579..0d65c4fa837 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -2623,7 +2623,7 @@ public class StreamsPartitionAssignorTest { builder = new CorruptedInternalTopologyBuilder(); topologyMetadata = new TopologyMetadata(builder, new StreamsConfig(configProps(parameterizedConfig))); - final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder); + final InternalStreamsBuilder streamsBuilder = new InternalStreamsBuilder(builder, false); final KStream<String, String> inputTopic = streamsBuilder.stream(singleton("topic1"), new ConsumedInternal<>(Consumed.with(null, null))); final KTable<String, String> inputTable = streamsBuilder.table("topic2", new ConsumedInternal<>(Consumed.with(null, null)), new MaterializedInternal<>(Materialized.as("store")));