Abdullah alkhawatrah created KAFKA-15595:
--------------------------------------------
Summary: Session window aggregate drops records headers
Key: KAFKA-15595
URL: https://issues.apache.org/jira/browse/KAFKA-15595
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.5.1
Reporter: Abdullah alkhawatrah
Hey,
While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow
aggregate behaviour, it seems now that custom headers added before the
aggregate are dropped.
I could reproduce the behaviour with the following test topology:
{code:java}
// code placeholder
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(inputTopic, Consumed.with(EARLIEST))
.process(() -> new Processor<Object, Object, Object, Object>() {
private ProcessorContext<Object, Object> context;
@Override
public void init(final ProcessorContext<Object, Object> context) {
this.context = context;
}
@Override
public void process(Record<Object, Object> record) {
record.headers().add("key1",
record.value().toString().getBytes());
context.forward(record);
}
})
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L),
Duration.ofDays(1L)))
.aggregate(() -> 1,
(key, value, aggregate) -> aggregate,
(aggKey, aggOne, aggTwo) -> aggTwo)
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value))
.to(outputTopic); {code}
Checking evens in the `outputTopic` show that the headers are empty. With 3.2.*
the same topology would have propagated the headers.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)