Florens Pauwels created KAFKA-19602: ---------------------------------------
Summary: Kafka Streams join after unmaterialized transformValues on KTable with extra store fails Key: KAFKA-19602 URL: https://issues.apache.org/jira/browse/KAFKA-19602 Project: Kafka Issue Type: Bug Affects Versions: 3.9.1, 3.6.1 Reporter: Florens Pauwels I believe for this to occur you need # transformValues on a KTable, followed by a KTable join or leftJoin # The transformValues is not materialized (no store name given) # The transformValues accesses at least extra store Tested on 3.6.1 and 3.9.1 Example code: {code:java} @Component class TestCase { private static final StoreBuilder<TimestampedKeyValueStore<String, String>> TRANSFORMER_STORE = Stores.timestampedKeyValueStoreBuilder( Stores.persistentTimestampedKeyValueStore("transformer-store"), Serdes.String(), Serdes.String() ); private final StreamsBuilder streamsBuilder; TestCase(StreamsBuilder streamsBuilder) { this.streamsBuilder = streamsBuilder; } @PostConstruct void configure() { streamsBuilder.addStateStore(TRANSFORMER_STORE); var aggregateTable = streamsBuilder .stream("input", Consumed.with(Serdes.String(), Serdes.String()).withName("input-to-stream")) .toTable(Named.as("to-table"), MaterializedAs.keyValue("aggregate-store", Serdes.String(), Serdes.String())) .transformValues(MyTransformer::new, Materialized.with(Serdes.String(), Serdes.String()), Named.as("my-transformer"), TRANSFORMER_STORE.name()); aggregateTable .join(aggregateTable, (value, _) -> value, Named.as("after-transformer"), Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("after-transformer-store") .withKeySerde(Serdes.String()) .withValueSerde(Serdes.String())) .toStream(Named.as("aggregate-to-stream")) .to("output", Produced.with(Serdes.String(), Serdes.String()).withName("output-to-topic")); System.out.println(streamsBuilder.build().describe().toString()); } private static class MyTransformer implements ValueTransformerWithKey<String, String , String> { @Override public void init(ProcessorContext context) { context.getStateStore(TRANSFORMER_STORE.name()); } @Override public String transform(String readOnlyKey, String value) { return value; } @Override public void close() { } } } {code} Result of the above code: {noformat} org.apache.kafka.streams.errors.StreamsException: failed to initialize processor after-transformer-join-this at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:131) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:140) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1089) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:295) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:980) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:1055) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:920) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1191) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:999) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:713) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:672) ~[kafka-streams-3.9.1.jar:na] Caused by: org.apache.kafka.streams.errors.StreamsException: Processor after-transformer-join-this has no access to StateStore transformer-store as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA. at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:174) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext.getStateStore(ForwardingDisabledProcessorContext.java:90) ~[kafka-streams-3.9.1.jar:na] at be.florens.kafkaspringtest.selfjoin.TestCase$MyTransformer.init(TestCase.java:63) ~[main/:na] at org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesGetter.init(KTableTransformValues.java:156) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.init(KTableKTableInnerJoin.java:83) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:123) ~[kafka-streams-3.9.1.jar:na] ... 10 common frames omitted{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)