The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"

Not sure what you mean by this? The issue the tickets describe is, that if you don't do a plain `put(key,value)` in your processor, stuff breaks right now. (Note that `delete(key)` and `put(key,null)` is the same).


It's a known issue, bad API, and also bad documentation on our side, and I guess you can call it a bug if you wish. However, you can only use tombstones as deletes right now. Thus, what you do "wrong" is

if (record.value() == null == record.value().equals("deleteme")) {
            store.delete(record.key());
}

In your case you also delete if the value is not null and if the value not-equals "deleteme", right? Ie, you use non-tombstone records as deletes what is just not allowed/supported.

The issue is that during restore only `null` values, ie, actual tombstones are handled as deletes and thus, if you delete a key using a non-tombstone record in your processor, this key can be resurrected during restore.


I cannot explain why all() and get(key) actually give you different result with respect to `key`. If a key is resurrected during a restore, both method should return it. Not sure why `get(key)` returns `null` even if `all()` contains the key... I would rather expect that both return the resurrected key.

Hope this helps.


-Matthias


On 12/8/22 12:00 PM, Patrick D’Addona wrote:
Hi,

I don't think this issue is exactly the same as KAFKA-7663.

The way I see it, KAFKA-7663 says, "a global store will be exactly the input topic 
after restore, regardless of the processor"
My issue here, is that the global store after restore is inconsistent with the 
input topic and the store itself.
Because it finds records with key "foo" using **store.all()** that it can not find via 
**store.get("foo")**.
The **store.get()** is consistent with my input topic, where the tombstone is the latest entry for 
the key "foo", reflecting the **delete("foo")** operation on the store.
But still, looping over the store returns a record with "foo" as a key and a 
non null value.

If the store acts like a Map, where you can call **get(k)** and **put(k, v)**, 
then looping over it should only find entries, that actually exist and have a 
value when using **get(k)**.
Restoring something that breaks this connection seems wrong, even if that 
restoring ignores the processor and directly writes to the store.
It should remove keys, for which the last entry is a tombstone from the 
**all()** iterator, regardless whether the restore process uses a custom 
processor as KAFKA-7663 wants, or simply reads the topic as it currently does.

Kind Regards,
Patrick

________________________________
From: Colt McNealy <c...@littlehorse.io>
Sent: Thursday, December 8, 2022 17:54
To: patrick.dadd...@maibornwolff.de.invalid 
<patrick.dadd...@maibornwolff.de.invalid>
Cc: dev@kafka.apache.org <dev@kafka.apache.org>
Subject: Re: [Streams] GlobalStateManagerImpl#restoreState locally restores 
previously deleted records

Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
<patrick.dadd...@maibornwolff.de.invalid> wrote:

Hello,

I have a quarkus application using
**org.apache.kafka:kafka-streams:3.1.0** and found that
* when creating a global table using a compacted topic as input
* entries that have been deleted at some point
* are then no longer returned when iterating over the store with
**store.all()** - as expected
* but after the pod restarts and its kafka streams state directory is
deleted, after restoring from the topic using
**org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
* those formerly deleted records are once again returned by that store
when using **store.all()** - not expected
* however they return null, using **store.get("foo")** - as expected

This is somewhat similar to
https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
be able to modify this restore behaviour.
However it is also different, because I think it is not documented
anywhere and it is unintuitive (to me) - since it changes how the
application behaves after restarting it even if the kafka cluster itself
was not changed - so I think it's more of a bug than missing documentation.

Some more information, the topic is configured like this
```java
cleanup.policy: compact
compression.type: producer
delete.retention.ms: 86400000
max.compaction.lag.ms: 9223372036854776000
min.compaction.lag.ms: 0
retention.bytes: -1
retention.ms: 86400000
```

I am adding the global store like so
```java
streamsBuilder.addGlobalStore(
         Stores.timestampedKeyValueStoreBuilder(
                 Stores.persistentTimestampedKeyValueStore("foobar"),
                 Serdes.String(),
                 Serdes.String()),
         "foo.bar.globaltopic",
         Consumed.with(Serdes.String(), Serdes.String()),
         () -> new FooBarUpdateHandler(timeService)
);
```

and here is the definition of 'FooBarUpdateHandler'
```java
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
  * Internal class handling partFamily updates.
  */
public class FooBarUpdateHandler implements Processor<String, String,
Void, Void> {

     private static final Logger logger =
LoggerFactory.getLogger(FooBarUpdateHandler.class);
     private TimestampedKeyValueStore<String, String> store;

     @Override
     public void init(final
org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void>
context) {
         store = context.getStateStore("foobar");
     }

     @Override
     public void process(final Record<String, String> record) {

         // handle tombstones from input topic
         if (record.value() == null == record.value().equals("deleteme")) {
             store.delete(record.key());
         } else {
             store.put(
                     record.key(),
                     ValueAndTimestamp.make(
                             record.key(),
                             Instant.now().toEpochMilli()
                     )
             );
         }

         // this is not relevant
         // it's only to show the issue when restarting and restoring the
         final List<String> existingKeys = new ArrayList<>();
         try (final KeyValueIterator<String, ValueAndTimestamp<String>> all
= store.all()) {
             all.forEachRemaining((r) -> {
                 existingKeys.add(r.key);
             });
         }
         logger.info("Got {} records in the store, with keys {}",
existingKeys.size(), String.join(",", existingKeys));
     }
}
```

My workaround is to add this to the 'init' method of the
'FooBarUpdateHandler'
```java
try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
store.all()) {
     if (all == null) {
         return;
     }
     logger.info("Removing already deleted records from rocksdb
representing the global store {}", storeName);
     all.forEachRemaining(r -> {
         if (r != null && r.key != null && store.get(r.key) == null) {
             store.delete(r.key);
         }
     });
}
```
Now it is again consistent across restarts.

Kind Regards,
Patrick


Patrick D’Addona
Senior Lead IT Architect


Mobile: +49 151 544 22 161
patrick.dadd...@maibornwolff.de
Theresienhöhe 13, 80339 München

MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
www.maibornwolff.de<http://www.maibornwolff.de>, Phone +49 89 544 253 000
USt-ID DE 129 299 525, Munich District Court HRB 98058
Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
____________________________________________________________




Reply via email to