The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"

Not sure what you mean by this? The issue the tickets describe is, that if you don't do a plain `put(key,value)` in your processor, stuff breaks right now. (Note that `delete(key)` and `put(key,null)` is the same).

It's a known issue, bad API, and also bad documentation on our side, and I guess you can call it a bug if you wish. However, you can only use tombstones as deletes right now. Thus, what you do "wrong" is

if (record.value() == null == record.value().equals("deleteme")) {

In your case you also delete if the value is not null and if the value not-equals "deleteme", right? Ie, you use non-tombstone records as deletes what is just not allowed/supported.

The issue is that during restore only `null` values, ie, actual tombstones are handled as deletes and thus, if you delete a key using a non-tombstone record in your processor, this key can be resurrected during restore.

I cannot explain why all() and get(key) actually give you different result with respect to `key`. If a key is resurrected during a restore, both method should return it. Not sure why `get(key)` returns `null` even if `all()` contains the key... I would rather expect that both return the resurrected key.

Hope this helps.


On 12/8/22 12:00 PM, Patrick D’Addona wrote:

I don't think this issue is exactly the same as KAFKA-7663.

The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"
My issue here, is that the global store after restore is inconsistent with the 
input topic and the store itself.
Because it finds records with key "foo" using **store.all()** that it can not find via 
The **store.get()** is consistent with my input topic, where the tombstone is the latest entry for 
the key "foo", reflecting the **delete("foo")** operation on the store.
But still, looping over the store returns a record with "foo" as a key and a 
non null value.

If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, 
then looping over it should only find entries, that actually exist and have a 
value when using **get(k)**.
Restoring something that breaks this connection seems wrong, even if that 
restoring ignores the processor and directly writes to the store.
It should remove keys, for which the last entry is a tombstone from the 
**all()** iterator, regardless whether the restore process uses a custom 
processor as KAFKA-7663 wants, or simply reads the topic as it currently does.

Kind Regards,

From: Colt McNealy <>
Sent: Thursday, December 8, 2022 17:54
Cc: <>
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Colt McNealy

On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
<> wrote:


I have a quarkus application using
**org.apache.kafka:kafka-streams:3.1.0** and found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with
**store.all()** - as expected
* but after the pod restarts and its kafka streams state directory is
deleted, after restoring from the topic using
* those formerly deleted records are once again returned by that store
when using **store.all()** - not expected
* however they return null, using **store.get("foo")** - as expected

This is somewhat similar to, in that I would like to
be able to modify this restore behaviour.
However it is also different, because I think it is not documented
anywhere and it is unintuitive (to me) - since it changes how the
application behaves after restarting it even if the kafka cluster itself
was not changed - so I think it's more of a bug than missing documentation.

Some more information, the topic is configured like this
cleanup.policy: compact
compression.type: producer 86400000 9223372036854776000 0
retention.bytes: -1 86400000

I am adding the global store like so
         Consumed.with(Serdes.String(), Serdes.String()),
         () -> new FooBarUpdateHandler(timeService)

and here is the definition of 'FooBarUpdateHandler'
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

  * Internal class handling partFamily updates.
public class FooBarUpdateHandler implements Processor<String, String,
Void, Void> {

     private static final Logger logger =
     private TimestampedKeyValueStore<String, String> store;

     public void init(final
org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void>
context) {
         store = context.getStateStore("foobar");

     public void process(final Record<String, String> record) {

         // handle tombstones from input topic
         if (record.value() == null == record.value().equals("deleteme")) {
         } else {

         // this is not relevant
         // it's only to show the issue when restarting and restoring the
         final List<String> existingKeys = new ArrayList<>();
         try (final KeyValueIterator<String, ValueAndTimestamp<String>> all
= store.all()) {
             all.forEachRemaining((r) -> {
         }"Got {} records in the store, with keys {}",
existingKeys.size(), String.join(",", existingKeys));

My workaround is to add this to the 'init' method of the
try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
store.all()) {
     if (all == null) {
     }"Removing already deleted records from rocksdb
representing the global store {}", storeName);
     all.forEachRemaining(r -> {
         if (r != null && r.key != null && store.get(r.key) == null) {
Now it is again consistent across restarts.

Kind Regards,

Patrick D’Addona
Senior Lead IT Architect

Mobile: +49 151 544 22 161
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany<>, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.

Reply via email to