Hi Patrick,

Your issue is in fact identical to KAFKA-7663. As per that
issue/bug/discussion, if your processor does anything other than simply
pass-through records, the results of initial processing vs restoration are
different.

Global State Stores don't have a changelog topic (for example, in the
processor API, Global State Stores are only valid if the builder has
.withLoggingDisabled()). That's because the processor for the global store
runs on each of your N streams instances, and if the processor on each
instance published to the changelog, then each put/delete would be written
N times, which is wasteful.

The implications to this are that your input topic should be "like" a
changelog:
- Your input topic should NOT have limited retention otherwise you'll lose
old data.
- Your input topic should ideally be compacted if possible

I agree that the API as it stands is highly confusing—why allow users to
provide a processor if it offers a way to "shoot oneself in one's foot?"

Changing that API would probably require a KIP. I don't quite have the
bandwidth to propose + implement such a KIP right now, but if you would
like to, feel free! (perhaps in the spring I may have time)

Your workaround (the init() method) is a good one. Another way to do it
might be to simply have a regular processing step which converts the input
topic into the true "changelog" format before you push it to a global store.

Cheers,
Colt McNealy
*Founder, LittleHorse.io*


On Thu, Dec 8, 2022 at 8:41 AM Patrick D’Addona
<patrick.dadd...@maibornwolff.de.invalid> wrote:

> Hello,
>
> I have a quarkus application using
> **org.apache.kafka:kafka-streams:3.1.0** and found that
> * when creating a global table using a compacted topic as input
> * entries that have been deleted at some point
> * are then no longer returned when iterating over the store with
> **store.all()** - as expected
> * but after the pod restarts and its kafka streams state directory is
> deleted, after restoring from the topic using
> **org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl#restoreState**
> * those formerly deleted records are once again returned by that store
> when using **store.all()** - not expected
> * however they return null, using **store.get("foo")** - as expected
>
> This is somewhat similar to
> https://issues.apache.org/jira/browse/KAFKA-7663, in that I would like to
> be able to modify this restore behaviour.
> However it is also different, because I think it is not documented
> anywhere and it is unintuitive (to me) - since it changes how the
> application behaves after restarting it even if the kafka cluster itself
> was not changed - so I think it's more of a bug than missing documentation.
>
> Some more information, the topic is configured like this
> ```java
> cleanup.policy: compact
> compression.type: producer
> delete.retention.ms: 86400000
> max.compaction.lag.ms: 9223372036854776000
> min.compaction.lag.ms: 0
> retention.bytes: -1
> retention.ms: 86400000
> ```
>
> I am adding the global store like so
> ```java
> streamsBuilder.addGlobalStore(
>         Stores.timestampedKeyValueStoreBuilder(
>                 Stores.persistentTimestampedKeyValueStore("foobar"),
>                 Serdes.String(),
>                 Serdes.String()),
>         "foo.bar.globaltopic",
>         Consumed.with(Serdes.String(), Serdes.String()),
>         () -> new FooBarUpdateHandler(timeService)
> );
> ```
>
> and here is the definition of 'FooBarUpdateHandler'
> ```java
> import java.time.Instant;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.kafka.streams.processor.api.Processor;
> import org.apache.kafka.streams.processor.api.Record;
> import org.apache.kafka.streams.state.KeyValueIterator;
> import org.apache.kafka.streams.state.TimestampedKeyValueStore;
> import org.apache.kafka.streams.state.ValueAndTimestamp;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> /**
>  * Internal class handling partFamily updates.
>  */
> public class FooBarUpdateHandler implements Processor<String, String,
> Void, Void> {
>
>     private static final Logger logger =
> LoggerFactory.getLogger(FooBarUpdateHandler.class);
>     private TimestampedKeyValueStore<String, String> store;
>
>     @Override
>     public void init(final
> org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void>
> context) {
>         store = context.getStateStore("foobar");
>     }
>
>     @Override
>     public void process(final Record<String, String> record) {
>
>         // handle tombstones from input topic
>         if (record.value() == null == record.value().equals("deleteme")) {
>             store.delete(record.key());
>         } else {
>             store.put(
>                     record.key(),
>                     ValueAndTimestamp.make(
>                             record.key(),
>                             Instant.now().toEpochMilli()
>                     )
>             );
>         }
>
>         // this is not relevant
>         // it's only to show the issue when restarting and restoring the
>         final List<String> existingKeys = new ArrayList<>();
>         try (final KeyValueIterator<String, ValueAndTimestamp<String>> all
> = store.all()) {
>             all.forEachRemaining((r) -> {
>                 existingKeys.add(r.key);
>             });
>         }
>         logger.info("Got {} records in the store, with keys {}",
> existingKeys.size(), String.join(",", existingKeys));
>     }
> }
> ```
>
> My workaround is to add this to the 'init' method of the
> 'FooBarUpdateHandler'
> ```java
> try (final KeyValueIterator<String, ValueAndTimestamp<String>> all =
> store.all()) {
>     if (all == null) {
>         return;
>     }
>     logger.info("Removing already deleted records from rocksdb
> representing the global store {}", storeName);
>     all.forEachRemaining(r -> {
>         if (r != null && r.key != null && store.get(r.key) == null) {
>             store.delete(r.key);
>         }
>     });
> }
> ```
> Now it is again consistent across restarts.
>
> Kind Regards,
> Patrick
>
>
> Patrick D’Addona
> Senior Lead IT Architect
>
>
> Mobile: +49 151 544 22 161
> patrick.dadd...@maibornwolff.de
> Theresienhöhe 13, 80339 München
>
> MaibornWolff GmbH, Theresienhoehe 13, D-80339 Munich, Germany
> www.maibornwolff.de, Phone +49 89 544 253 000
> USt-ID DE 129 299 525, Munich District Court HRB 98058
> Managing Directors: Volker Maiborn, Holger Wolff, Alexander Hofmann,
> Florian Theimer, Marcus Adlwart, Dr. Martina Beck, Christian Loos.
> ____________________________________________________________
>
>
>

Reply via email to