This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 148a2707265 KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders
(5/N) (#21455)
148a2707265 is described below
commit 148a2707265e618d2a9bc1009081cf570983672f
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 25 03:02:32 2026 +0100
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (5/N) (#21455)
This PR adds required classes or modifies the existing ones to build the
`TimestampedKeyValueStoreWithHeaders` introduced in KIP-1271.
Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../TimestampedKeyValueStoreWithHeadersTest.java | 557 +++++++++++++++++++++
.../internals/AbstractReadWriteDecorator.java | 15 +-
.../org/apache/kafka/streams/state/Stores.java | 49 +-
.../state/internals/CachingKeyValueStore.java | 18 +-
.../state/internals/KeyValueStoreBuilder.java | 2 +-
.../state/internals/ListValueStoreBuilder.java | 2 +-
...MeteredTimestampedKeyValueStoreWithHeaders.java | 4 +-
.../RocksDBKeyValueBytesStoreSupplier.java | 19 +-
.../RocksDBTimestampedStoreWithHeaders.java | 11 +
.../internals/TimestampedKeyValueStoreBuilder.java | 2 +-
...imestampedKeyValueStoreBuilderWithHeaders.java} | 77 +--
.../TimestampedToHeadersIteratorAdapter.java | 59 +++
.../TimestampedToHeadersStoreAdapter.java | 200 ++++++++
.../CachingInMemoryKeyValueStoreTest.java | 6 +-
.../RocksDBTimestampedStoreWithHeadersTest.java | 17 +
...stampedKeyValueStoreBuilderWithHeadersTest.java | 422 ++++++++++++++++
.../apache/kafka/streams/TopologyTestDriver.java | 34 ++
17 files changed, 1442 insertions(+), 52 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
new file mode 100644
index 00000000000..767615bd105
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("integration")
+public class TimestampedKeyValueStoreWithHeadersTest {
+
+ private static final String STORE_NAME = "headers-store";
+
+ private String inputStream;
+ private String outputStream;
+ private long baseTimestamp;
+
+ private KafkaStreams kafkaStreams;
+
+ private static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+
+ private static final Headers HEADERS1 = new RecordHeaders()
+ .add("source", "test".getBytes())
+ .add("version", "1.0".getBytes());
+
+ private static final Headers HEADERS2 = new RecordHeaders()
+ .add("source", "test".getBytes())
+ .add("version", "2.0".getBytes());
+
+ private static final Headers EMPTY_HEADERS = new RecordHeaders();
+
+ public TestInfo testInfo;
+
+ @BeforeAll
+ public static void before() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterAll
+ public static void after() {
+ CLUSTER.stop();
+ }
+
+ @BeforeEach
+ public void beforeTest(final TestInfo testInfo) throws
InterruptedException {
+ this.testInfo = testInfo;
+ final String uniqueTestName = safeUniqueTestName(testInfo);
+ inputStream = "input-stream-" + uniqueTestName;
+ outputStream = "output-stream-" + uniqueTestName;
+ CLUSTER.createTopic(inputStream);
+ CLUSTER.createTopic(outputStream);
+
+ baseTimestamp = CLUSTER.time.milliseconds();
+
+ }
+
+ @AfterEach
+ public void afterTest() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ kafkaStreams.cleanUp();
+ }
+ }
+
+ @Test
+ public void shouldPutGetAndDelete() throws Exception {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce source data with headers
+ int numRecordsProduced = 0;
+
+ numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp, HEADERS1,
+ KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3,
null));
+
+ numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + 5, HEADERS2,
+ KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3,
"c5"));
+
+ numRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + 2,
+ EMPTY_HEADERS,
+ KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3,
null));
+
+ // wait for output and verify
+ final List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ numRecordsProduced);
+
+ for (final KeyValue<Integer, Integer> receivedRecord :
receivedRecords) {
+ // verify zero failed checks for each record
+ assertEquals(receivedRecord.value, 0);
+ }
+ }
+
+ @Test
+ public void shouldSetChangelogTopicProperties() throws Exception {
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce record (and wait for result) to create changelog
+ produceDataToTopicWithHeaders(inputStream, baseTimestamp, new
RecordHeaders(), KeyValue.pair(0, "foo"));
+
+ IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ 1);
+
+ // verify changelog topic properties
+ final String changelogTopic =
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME +
"-changelog";
+ final Properties changelogTopicConfig =
CLUSTER.getLogConfig(changelogTopic);
+ assertEquals(changelogTopicConfig.getProperty("cleanup.policy"),
"compact");
+ }
+
+ @Test
+ public void shouldRestore() throws Exception {
+ StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce source data with headers
+ final Map<Integer, Optional<ValueTimestampHeaders<String>>>
expectedData = new HashMap<>();
+ int initialRecordsProduced = 0;
+
+ initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp, HEADERS1,
+ KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3,
null));
+ expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a0",
baseTimestamp, HEADERS1)));
+ expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b0",
baseTimestamp, HEADERS1)));
+ expectedData.put(3, Optional.empty()); // null value
+
+ initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + 5, HEADERS2,
+ KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3,
"c5"));
+ expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a5",
baseTimestamp + 5, HEADERS2)));
+ expectedData.put(2, Optional.empty()); // null value
+ expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c5",
baseTimestamp + 5, HEADERS2)));
+
+ initialRecordsProduced += produceDataToTopicWithHeaders(inputStream,
baseTimestamp + 10, EMPTY_HEADERS,
+ KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3,
"c10"));
+ expectedData.put(1, Optional.of(ValueTimestampHeaders.make("a10",
baseTimestamp + 10, EMPTY_HEADERS)));
+ expectedData.put(2, Optional.of(ValueTimestampHeaders.make("b10",
baseTimestamp + 10, EMPTY_HEADERS)));
+ expectedData.put(3, Optional.of(ValueTimestampHeaders.make("c10",
baseTimestamp + 10, EMPTY_HEADERS)));
+
+ // wait for output
+ IntegrationTestUtils.waitUntilMinRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced);
+
+ // wipe out state store to trigger restore process on restart
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+
+ // restart app - use processor WITHOUT validation of initial data,
just write to store
+ streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce additional records to verify restored store works correctly
+ final Headers finalHeaders = new RecordHeaders().add("final",
"true".getBytes());
+ final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 12, finalHeaders,
+ KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3,
"c12"));
+
+ // wait for output and verify
+ final List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced + additionalRecordsProduced);
+
+ for (final KeyValue<Integer, Integer> receivedRecord :
receivedRecords) {
+ // verify zero failed checks for each record
+ assertEquals(receivedRecord.value, 0);
+ }
+ }
+
+ @Test
+ public void shouldManualUpgradeFromTimestampedToHeaders() throws Exception
{
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .addStateStore(
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(TimestampedStoreContentCheckerProcessor::new, STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build());
+ }
+
+ private void shouldManualUpgradeFromTimestampedToHeaders(final Topology
originalTopology) throws Exception {
+ // build original timestamped (legacy) topology and start app
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(originalTopology, props);
+ kafkaStreams.start();
+
+ // produce source data to legacy timestamped store (without headers)
+ int initialRecordsProduced = 0;
+ initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp,
+ KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3,
null));
+ initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp + 5,
+ KeyValue.pair(1, "a5"), KeyValue.pair(2, null), KeyValue.pair(3,
"c5"));
+ initialRecordsProduced += produceDataToTopic(inputStream,
baseTimestamp + 2,
+ KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3,
null));
+
+ // wait for output and verify
+ List<KeyValue<Integer, Integer>> receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced);
+
+ for (final KeyValue<Integer, Integer> receivedRecord :
receivedRecords) {
+ // verify zero failed checks for each record
+ assertEquals(receivedRecord.value, 0);
+ }
+
+ // wipe out state store to trigger restore process on restart
+ kafkaStreams.close();
+ kafkaStreams.cleanUp();
+
+ // restart app with headers-aware store to test upgrade path
+ // The store should migrate legacy timestamped data (without headers)
+ // and add empty headers to existing data
+ final StreamsBuilder streamsBuilder = new StreamsBuilder();
+
+ streamsBuilder
+ .addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+ Serdes.Integer(),
+ Serdes.String()
+ )
+ )
+ .stream(inputStream, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .process(() -> new
TimestampedStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+ .to(outputStream, Produced.with(Serdes.Integer(),
Serdes.Integer()));
+
+ kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
+ kafkaStreams.start();
+
+ // produce additional records with headers to verify upgraded store
works
+ final Headers upgradedHeaders = new RecordHeaders().add("upgraded",
"true".getBytes());
+ final int additionalRecordsProduced =
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 12, upgradedHeaders,
+ KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3,
"c12"));
+
+ // wait for output and verify
+ receivedRecords =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
+ TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerDeserializer.class,
+ IntegerDeserializer.class),
+ outputStream,
+ initialRecordsProduced + additionalRecordsProduced);
+
+ for (final KeyValue<Integer, Integer> receivedRecord :
receivedRecords) {
+ // verify zero failed checks for each record
+ assertEquals(receivedRecord.value, 0);
+ }
+ }
+
+ private Properties props() {
+ final String safeTestName = safeUniqueTestName(testInfo);
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ return streamsConfiguration;
+ }
+
+ /**
+ * @return number of records produced
+ */
+ @SuppressWarnings("varargs")
+ @SafeVarargs
+ private final int produceDataToTopic(final String topic,
+ final long timestamp,
+ final KeyValue<Integer, String>...
keyValues) {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ topic,
+ Arrays.asList(keyValues),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class),
+ timestamp);
+ return keyValues.length;
+ }
+
+
+
+ /**
+ * Produce records with headers.
+ *
+ * @return number of records produced
+ */
+ @SuppressWarnings("varargs")
+ @SafeVarargs
+ private final int produceDataToTopicWithHeaders(final String topic,
+ final long timestamp,
+ final Headers headers,
+ final KeyValue<Integer,
String>... keyValues) {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ topic,
+ Arrays.asList(keyValues),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+ return keyValues.length;
+ }
+
+ /**
+ * Processor for validating expected contents of a timestamped store with
headers, and forwards
+ * the number of failed checks downstream for consumption.
+ */
+ private static class TimestampedStoreWithHeadersContentCheckerProcessor
implements Processor<Integer, String, Integer, Integer> {
+
+ private ProcessorContext<Integer, Integer> context;
+ private TimestampedKeyValueStoreWithHeaders<Integer, String> store;
+
+ // whether the processor should write records to the store as they
arrive.
+ private final boolean writeToStore;
+ // in-memory copy of seen data, to validate for testing purposes.
+ private final Map<Integer, Optional<ValueTimestampHeaders<String>>>
data;
+
+ TimestampedStoreWithHeadersContentCheckerProcessor(final boolean
writeToStore) {
+ this.writeToStore = writeToStore;
+ this.data = new HashMap<>();
+ }
+
+ @Override
+ public void init(final ProcessorContext<Integer, Integer> context) {
+ this.context = context;
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<Integer, String> record) {
+ if (writeToStore) {
+ final ValueTimestampHeaders<String> valueTimestampHeaders =
+ ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers());
+ store.put(record.key(), valueTimestampHeaders);
+ data.put(record.key(),
Optional.ofNullable(valueTimestampHeaders));
+ }
+
+ // check expected contents of store, and signal completion by
writing number of failures to downstream
+ final int failedChecks = checkStoreContents();
+ context.forward(record.withValue(failedChecks));
+ }
+
+ /**
+ * @return number of failed checks
+ */
+ private int checkStoreContents() {
+ int failedChecks = 0;
+ for (final Map.Entry<Integer,
Optional<ValueTimestampHeaders<String>>> keyWithValueTimestampHeaders :
data.entrySet()) {
+ final Integer key = keyWithValueTimestampHeaders.getKey();
+ final ValueTimestampHeaders<String>
expectedValueTimestampHeaders =
+ keyWithValueTimestampHeaders.getValue().orElse(null);
+
+ // validate get from store
+ final ValueTimestampHeaders<String>
actualValueTimestampHeaders = store.get(key);
+ if (!Objects.equals(actualValueTimestampHeaders,
expectedValueTimestampHeaders)) {
+ failedChecks++;
+ }
+ }
+ return failedChecks;
+ }
+ }
+
+ /**
+ * Processor for validating expected contents of a timestamped store
(without headers).
+ * Used for testing the upgrade path from TimestampedKeyValueStore to
TimestampedKeyValueStoreWithHeaders.
+ */
+ private static class TimestampedStoreContentCheckerProcessor implements
Processor<Integer, String, Integer, Integer> {
+
+ private ProcessorContext<Integer, Integer> context;
+ private TimestampedKeyValueStore<Integer, String> store;
+
+ // in-memory copy of seen data, to validate for testing purposes.
+ private final Map<Integer, Optional<ValueAndTimestamp<String>>> data;
+
+ TimestampedStoreContentCheckerProcessor() {
+ this.data = new HashMap<>();
+ }
+
+ @Override
+ public void init(final ProcessorContext<Integer, Integer> context) {
+ this.context = context;
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<Integer, String> record) {
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make(record.value(), record.timestamp());
+ store.put(record.key(), valueAndTimestamp);
+ data.put(record.key(), Optional.ofNullable(valueAndTimestamp));
+
+ // check expected contents of store, and signal completion by
writing
+ // number of failures to downstream
+ final int failedChecks = checkStoreContents();
+ context.forward(record.withValue(failedChecks));
+ }
+
+ /**
+ * @return number of failed checks
+ */
+ private int checkStoreContents() {
+ int failedChecks = 0;
+ for (final Map.Entry<Integer, Optional<ValueAndTimestamp<String>>>
keyWithValueAndTimestamp : data.entrySet()) {
+ final Integer key = keyWithValueAndTimestamp.getKey();
+ final ValueAndTimestamp<String> valueAndTimestamp =
keyWithValueAndTimestamp.getValue().orElse(null);
+
+ // validate get from store
+ final ValueAndTimestamp<String> record = store.get(key);
+ if (!Objects.equals(record, valueAndTimestamp)) {
+ failedChecks++;
+ }
+ }
+ return failedChecks;
+ }
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 569adad58c3..8748843f034 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -26,8 +26,10 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.WindowStore;
@@ -61,7 +63,9 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
}
static StateStore wrapWithReadWriteStore(final StateStore store) {
- if (store instanceof TimestampedKeyValueStore) {
+ if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+ return new
TimestampedKeyValueStoreReadWriteDecoratorWithHeaders<>((TimestampedKeyValueStoreWithHeaders<?,
?>) store);
+ } else if (store instanceof TimestampedKeyValueStore) {
return new
TimestampedKeyValueStoreReadWriteDecorator<>((TimestampedKeyValueStore<?, ?>)
store);
} else if (store instanceof VersionedKeyValueStore) {
return new
VersionedKeyValueStoreReadWriteDecorator<>((VersionedKeyValueStore<?, ?>)
store);
@@ -326,4 +330,13 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
return wrapped().fetch(keyFrom, keyTo);
}
}
+
+ static class TimestampedKeyValueStoreReadWriteDecoratorWithHeaders<K, V>
+ extends KeyValueStoreReadWriteDecorator<K, ValueTimestampHeaders<V>>
+ implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+ TimestampedKeyValueStoreReadWriteDecoratorWithHeaders(final
TimestampedKeyValueStoreWithHeaders<K, V> inner) {
+ super(inner);
+ }
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 101c5c7e145..e36fe042b6a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesSto
import
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
+import
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -94,7 +95,7 @@ public final class Stores {
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final
String name) {
Objects.requireNonNull(name, "name cannot be null");
- return new RocksDBKeyValueBytesStoreSupplier(name, false);
+ return new RocksDBKeyValueBytesStoreSupplier(name, false, false);
}
/**
@@ -113,7 +114,33 @@ public final class Stores {
*/
public static KeyValueBytesStoreSupplier
persistentTimestampedKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
- return new RocksDBKeyValueBytesStoreSupplier(name, true);
+ return new RocksDBKeyValueBytesStoreSupplier(name, true, false);
+ }
+
+ /**
+ * Create a persistent {@link KeyValueBytesStoreSupplier} that stores
headers along with timestamps.
+ * <p>
+ * This store supplier can be passed into a
+ * {@link
#timestampedKeyValueStoreBuilderWithHeaders(KeyValueBytesStoreSupplier, Serde,
Serde)}
+ * to build a {@link TimestampedKeyValueStoreWithHeaders}.
+ * <p>
+ * The store will persist key-value pairs along with record timestamps and
headers,
+ * using the format specified in KIP-1271. This allows state stores to
maintain
+ * full record context including headers for downstream processing.
+ * <p>
+ * If you want to create a {@link KeyValueStore}, {@link
TimestampedKeyValueStore}, or
+ * {@link VersionedKeyValueStore} you should use {@link
#persistentKeyValueStore(String)},
+ * {@link #persistentTimestampedKeyValueStore(String)}, or
+ * {@link #persistentVersionedKeyValueStore(String, Duration)},
respectively,
+ * to create a store supplier instead.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be
used
+ * to build a persistent key-(headers/timestamp/value) store
+ */
+ public static KeyValueBytesStoreSupplier
persistentTimestampedKeyValueStoreWithHeaders(final String name) {
+ Objects.requireNonNull(name, "name cannot be null");
+ return new RocksDBKeyValueBytesStoreSupplier(name, true, true);
}
/**
@@ -563,4 +590,22 @@ public final class Stores {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new SessionStoreBuilder<>(supplier, keySerde, valueSerde,
Time.SYSTEM);
}
+
+ /**
+ * Creates a {@link StoreBuilder} that can be used to build a {@link
TimestampedKeyValueStoreWithHeaders}.
+ *
+ * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be
{@code null})
+ * @param keySerde the key serde to use
+ * @param valueSerde the value serde to use; if the serialized bytes is
{@code null} for put operations,
+ * it is treated as delete
+ * @param <K> key type
+ * @param <V> value type
+ * @return an instance of {@link StoreBuilder} than can build a {@link
KeyValueStore}
+ */
+ public static <K, V> StoreBuilder<TimestampedKeyValueStoreWithHeaders<K,
V>> timestampedKeyValueStoreBuilderWithHeaders(final KeyValueBytesStoreSupplier
supplier,
+
final Serde<K> keySerde,
+
final Serde<V> valueSerde) {
+ Objects.requireNonNull(supplier, "supplier cannot be null");
+ return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier,
keySerde, valueSerde, Time.SYSTEM);
+ }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index c6304814727..e8588d9146a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -65,7 +65,7 @@ public class CachingKeyValueStore
private Thread streamThread;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Position position;
- private final boolean timestampedSchema;
+ private final CacheType cacheType;
@FunctionalInterface
public interface CacheQueryHandler {
@@ -89,10 +89,10 @@ public class CachingKeyValueStore
);
- CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying, final
boolean timestampedSchema) {
+ CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying, final
CacheType cacheType) {
super(underlying);
position = Position.emptyPosition();
- this.timestampedSchema = timestampedSchema;
+ this.cacheType = cacheType;
}
@Override
@@ -130,6 +130,10 @@ public class CachingKeyValueStore
final PositionBound positionBound,
final QueryConfig config) {
+ if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ }
+
final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
final QueryResult<R> result;
@@ -187,7 +191,7 @@ public class CachingKeyValueStore
final LRUCacheEntry lruCacheEntry =
internalContext.cache().get(cacheName, key);
if (lruCacheEntry != null) {
final byte[] rawValue;
- if (timestampedSchema &&
!WrappedStateStore.isTimestamped(wrapped()) &&
!StoreQueryUtils.isAdapter(wrapped())) {
+ if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE &&
!WrappedStateStore.isTimestamped(wrapped()) &&
!StoreQueryUtils.isAdapter(wrapped())) {
rawValue =
ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value());
} else {
rawValue = lruCacheEntry.value();
@@ -506,4 +510,10 @@ public class CachingKeyValueStore
lock.writeLock().unlock();
}
}
+
+ public enum CacheType {
+ KEY_VALUE_STORE,
+ TIMESTAMPED_KEY_VALUE_STORE,
+ TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
index a93656f15d8..0cfbe2d575a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreBuilder.java
@@ -52,7 +52,7 @@ public class KeyValueStoreBuilder<K, V> extends
AbstractStoreBuilder<K, V, KeyVa
if (!enableCaching) {
return inner;
}
- return new CachingKeyValueStore(inner, false);
+ return new CachingKeyValueStore(inner,
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
}
private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final
KeyValueStore<Bytes, byte[]> inner) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
index 653246a7e69..92b4c4d96e9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ListValueStoreBuilder.java
@@ -51,7 +51,7 @@ public class ListValueStoreBuilder<K, V> extends
AbstractStoreBuilder<K, V, KeyV
if (!enableCaching) {
return inner;
}
- return new CachingKeyValueStore(inner, false);
+ return new CachingKeyValueStore(inner,
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
}
private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final
KeyValueStore<Bytes, byte[]> inner) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 4e955c98f1a..df36c04b2d5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -102,12 +102,12 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
- throw new UnsupportedOperationException("Querying is not supported for
" + getClass().getSimpleName());
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
}
@Override
public Position getPosition() {
- throw new UnsupportedOperationException("Position is not supported for
" + getClass().getSimpleName());
+ throw new UnsupportedOperationException("Position is not supported by
timestamped key-value stores with headers yet.");
}
protected Bytes keyBytes(final K key, final Headers headers) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
index 6f5d6d547db..a2d3426c6c7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java
@@ -24,11 +24,18 @@ public class RocksDBKeyValueBytesStoreSupplier implements
KeyValueBytesStoreSupp
private final String name;
private final boolean returnTimestampedStore;
+ private final boolean returnHeadersStore;
public RocksDBKeyValueBytesStoreSupplier(final String name,
- final boolean
returnTimestampedStore) {
+ final boolean
returnTimestampedStore,
+ final boolean returnHeadersStore)
{
this.name = name;
this.returnTimestampedStore = returnTimestampedStore;
+ this.returnHeadersStore = returnHeadersStore;
+ if (returnHeadersStore && !returnTimestampedStore) {
+ throw new IllegalStateException(
+ "RocksDBKeyValueBytesStoreSupplier cannot return a headers
store without also returning a timestamped store!");
+ }
}
@Override
@@ -38,9 +45,13 @@ public class RocksDBKeyValueBytesStoreSupplier implements
KeyValueBytesStoreSupp
@Override
public KeyValueStore<Bytes, byte[]> get() {
- return returnTimestampedStore ?
- new RocksDBTimestampedStore(name, metricsScope()) :
- new RocksDBStore(name, metricsScope());
+ if (returnHeadersStore && returnTimestampedStore) {
+ return new RocksDBTimestampedStoreWithHeaders(name,
metricsScope());
+ }
+ if (returnTimestampedStore) {
+ return new RocksDBTimestampedStore(name, metricsScope());
+ }
+ return new RocksDBStore(name, metricsScope());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
index c4dbb6ea259..52e4ec68bdb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java
@@ -18,6 +18,10 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -148,4 +152,11 @@ public class RocksDBTimestampedStoreWithHeaders extends
RocksDBStore implements
}
}
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ }
+
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
index ded0efe65f0..28516b4c8a7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
@@ -76,7 +76,7 @@ public class TimestampedKeyValueStoreBuilder<K, V>
if (!enableCaching) {
return inner;
}
- return new CachingKeyValueStore(inner, true);
+ return new CachingKeyValueStore(inner,
CachingKeyValueStore.CacheType.TIMESTAMPED_KEY_VALUE_STORE);
}
private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final
KeyValueStore<Bytes, byte[]> inner) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
similarity index 66%
copy from
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
copy to
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index ded0efe65f0..0c09eb2027d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -21,76 +21,88 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.TimestampedBytesStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
import java.util.List;
import java.util.Objects;
-public class TimestampedKeyValueStoreBuilder<K, V>
- extends AbstractStoreBuilder<K, ValueAndTimestamp<V>,
TimestampedKeyValueStore<K, V>> {
+/**
+ * Builder for {@link TimestampedKeyValueStoreWithHeaders} instances.
+ *
+ * This is analogous to {@link TimestampedKeyValueStoreBuilder}, but uses
+ * {@link ValueTimestampHeaders} as the value wrapper and wires up the
+ * header-aware store stack (change-logging, caching, metering).
+ */
+public class TimestampedKeyValueStoreBuilderWithHeaders<K, V>
+ extends AbstractStoreBuilder<K, ValueTimestampHeaders<V>,
TimestampedKeyValueStoreWithHeaders<K, V>> {
private final KeyValueBytesStoreSupplier storeSupplier;
- public TimestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier
storeSupplier,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
- final Time time) {
+ public TimestampedKeyValueStoreBuilderWithHeaders(final
KeyValueBytesStoreSupplier storeSupplier,
+ final Serde<K> keySerde,
+ final Serde<V>
valueSerde,
+ final Time time) {
super(
storeSupplier.name(),
keySerde,
- valueSerde == null ? null : new
ValueAndTimestampSerde<>(valueSerde),
- time);
+ valueSerde == null ? null : new
ValueTimestampHeadersSerde<>(valueSerde),
+ time
+ );
Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
Objects.requireNonNull(storeSupplier.metricsScope(), "storeSupplier's
metricsScope can't be null");
this.storeSupplier = storeSupplier;
}
@Override
- public TimestampedKeyValueStore<K, V> build() {
+ public TimestampedKeyValueStoreWithHeaders<K, V> build() {
KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
- if (!(store instanceof TimestampedBytesStore)) {
+
+ if (!(store instanceof HeadersBytesStore)) {
if (store.persistent()) {
- store = new
KeyValueToTimestampedKeyValueByteStoreAdapter(store);
+ store = new TimestampedToHeadersStoreAdapter(store);
} else {
- store = new InMemoryTimestampedKeyValueStoreMarker(store);
+ store = new
InMemoryTimestampedKeyValueStoreWithHeadersMarker(store);
}
}
- return new MeteredTimestampedKeyValueStore<>(
+
+ return new MeteredTimestampedKeyValueStoreWithHeaders<>(
maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.metricsScope(),
time,
keySerde,
- valueSerde);
+ valueSerde
+ );
}
private KeyValueStore<Bytes, byte[]> maybeWrapCaching(final
KeyValueStore<Bytes, byte[]> inner) {
if (!enableCaching) {
return inner;
}
- return new CachingKeyValueStore(inner, true);
+ return new CachingKeyValueStore(inner,
CachingKeyValueStore.CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS);
}
private KeyValueStore<Bytes, byte[]> maybeWrapLogging(final
KeyValueStore<Bytes, byte[]> inner) {
if (!enableLogging) {
return inner;
}
- return new ChangeLoggingTimestampedKeyValueBytesStore(inner);
+ return new
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(inner);
}
- private static final class InMemoryTimestampedKeyValueStoreMarker
+ private static final class
InMemoryTimestampedKeyValueStoreWithHeadersMarker
extends WrappedStateStore<KeyValueStore<Bytes, byte[]>, Bytes, byte[]>
- implements KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
+ implements KeyValueStore<Bytes, byte[]>, HeadersBytesStore {
- private InMemoryTimestampedKeyValueStoreMarker(final
KeyValueStore<Bytes, byte[]> wrapped) {
+ private InMemoryTimestampedKeyValueStoreWithHeadersMarker(final
KeyValueStore<Bytes, byte[]> wrapped) {
super(wrapped);
if (wrapped.persistent()) {
throw new IllegalArgumentException("Provided store must not be
a persistent store, but it is.");
@@ -159,16 +171,15 @@ public class TimestampedKeyValueStoreBuilder<K, V>
@Override
public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
-
- final long start = config.isCollectExecutionInfo() ?
System.nanoTime() : -1L;
- final QueryResult<R> result = wrapped().query(query,
positionBound, config);
- if (config.isCollectExecutionInfo()) {
- final long end = System.nanoTime();
- result.addExecutionInfo("Handled in " + getClass() + " in " +
(end - start) + "ns");
- }
- return result;
+ final PositionBound positionBound,
+ final QueryConfig config) {
+
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported by timestamped key-value stores with headers yet.");
+ }
+
+ @Override
+ public Position getPosition() {
+ throw new UnsupportedOperationException("Position is not supported
by timestamped key-value stores with headers yet.");
}
@Override
@@ -176,4 +187,4 @@ public class TimestampedKeyValueStoreBuilder<K, V>
return false;
}
}
-}
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
new file mode 100644
index 00000000000..6a0a23e1c36
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersIteratorAdapter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders}
and
+ * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore}.
+ *
+ * @see TimestampedToHeadersStoreAdapter
+ */
+
+class TimestampedToHeadersIteratorAdapter<K> implements KeyValueIterator<K,
byte[]> {
+ private final KeyValueIterator<K, byte[]> innerIterator;
+
+ public TimestampedToHeadersIteratorAdapter(final KeyValueIterator<K,
byte[]> innerIterator) {
+ this.innerIterator = innerIterator;
+ }
+
+ @Override
+ public void close() {
+ innerIterator.close();
+ }
+
+ @Override
+ public K peekNextKey() {
+ return innerIterator.peekNextKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return innerIterator.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, byte[]> next() {
+ final KeyValue<K, byte[]> timestampedKeyValue = innerIterator.next();
+ return KeyValue.pair(timestampedKeyValue.key,
convertToHeaderFormat(timestampedKeyValue.value));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
new file mode 100644
index 00000000000..cbc3bd1608b
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * This class is used to ensure backward compatibility at DSL level between
+ * {@link TimestampedKeyValueStoreWithHeaders} and
+ * {@link TimestampedKeyValueStore}.
+ * <p>
+ * If a user provides a supplier for {@code TimestampedKeyValueStore} (without
headers) via
+ * {@link Materialized#as(KeyValueBytesStoreSupplier)} when building
+ * a {@code TimestampedKeyValueStoreWithHeaders}, this adapter is used to
translate between
+ * the timestamped {@code byte[]} format and the timestamped-with-headers
{@code byte[]} format.
+ *
+ * @see TimestampedToHeadersIteratorAdapter
+ */
+@SuppressWarnings("unchecked")
+public class TimestampedToHeadersStoreAdapter implements KeyValueStore<Bytes,
byte[]> {
+ final KeyValueStore<Bytes, byte[]> store;
+
+ TimestampedToHeadersStoreAdapter(final KeyValueStore<Bytes, byte[]> store)
{
+ if (!store.persistent()) {
+ throw new IllegalArgumentException("Provided store must be a
persistent store, but it is not.");
+ }
+ if (!(store instanceof TimestampedBytesStore)) {
+ throw new IllegalArgumentException("Provided store must be a
timestamped store, but it is not.");
+ }
+ this.store = store;
+ }
+
+ /**
+ * Extract raw timestamped value (timestamp + value) from serialized
ValueTimestampHeaders.
+ * This strips the headers portion but keeps timestamp and value intact.
+ *
+ * Format conversion:
+ * Input: [headersSize(varint)][headers][timestamp(8)][value]
+ * Output: [timestamp(8)][value]
+ */
+ static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ // Skip headers, keep timestamp + value
+ buffer.position(buffer.position() + headersSize);
+
+ final byte[] result = new byte[buffer.remaining()];
+ buffer.get(result);
+ return result;
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] valueWithTimestampAndHeaders) {
+ store.put(key, rawTimestampedValue(valueWithTimestampAndHeaders));
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] valueWithTimestampAndHeaders) {
+ return convertToHeaderFormat(store.putIfAbsent(
+ key,
+ rawTimestampedValue(valueWithTimestampAndHeaders)));
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
+ final byte[] valueWithTimestampAndHeaders = entry.value;
+ store.put(entry.key,
rawTimestampedValue(valueWithTimestampAndHeaders));
+ }
+ }
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ return convertToHeaderFormat(store.delete(key));
+ }
+
+ @Override
+ public String name() {
+ return store.name();
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ store.init(stateStoreContext, root);
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ store.commit(changelogOffsets);
+ }
+
+ @Override
+ public void close() {
+ store.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return store.isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ }
+
+ @Override
+ public Position getPosition() {
+ return store.getPosition();
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ return convertToHeaderFormat(store.get(key));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
+ final Bytes to) {
+ return new TimestampedToHeadersIteratorAdapter<>(store.range(from,
to));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
+ final Bytes to) {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.reverseRange(from, to));
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+ return new TimestampedToHeadersIteratorAdapter<>(store.all());
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseAll() {
+ return new TimestampedToHeadersIteratorAdapter<>(store.reverseAll());
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return new
TimestampedToHeadersIteratorAdapter<>(store.prefixScan(prefix,
prefixKeySerializer));
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return store.approximateNumEntries();
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 4a0579b8ef0..7af5b68e462 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -85,7 +85,7 @@ public class CachingInMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest
final String storeName = "store";
underlyingStore = new InMemoryKeyValueStore(storeName);
cacheFlushListener = new CacheFlushListenerStub<>(new
StringDeserializer(), new StringDeserializer());
- store = new CachingKeyValueStore(underlyingStore, false);
+ store = new CachingKeyValueStore(underlyingStore,
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
store.setFlushListener(cacheFlushListener, false);
cache = new ThreadCache(new LogContext("testCache "),
maxCacheSizeBytes, new MockStreamsMetrics(new Metrics()));
context = new InternalMockProcessorContext<>(null, null, null, null,
cache);
@@ -110,7 +110,7 @@ public class CachingInMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest
@Test
public void shouldDelegateInit() {
final KeyValueStore<Bytes, byte[]> inner =
mock(InMemoryKeyValueStore.class);
- final CachingKeyValueStore outer = new CachingKeyValueStore(inner,
false);
+ final CachingKeyValueStore outer = new CachingKeyValueStore(inner,
CachingKeyValueStore.CacheType.KEY_VALUE_STORE);
when(inner.name()).thenReturn("store");
outer.init(context, outer);
verify(inner).init(context, outer);
@@ -179,7 +179,7 @@ public class CachingInMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest
private void setUpCloseTests() {
underlyingStore = mock(KeyValueStore.class);
when(underlyingStore.name()).thenReturn("store-name");
- store = new CachingKeyValueStore(underlyingStore, false);
+ store = new CachingKeyValueStore(underlyingStore,
CachingKeyValueStore.CacheType.TIMESTAMPED_KEY_VALUE_STORE);
cache = mock(ThreadCache.class);
context = new
InternalMockProcessorContext<>(TestUtils.tempDirectory(), null, null, null,
cache);
context.setRecordContext(new ProcessorRecordContext(10, 0, 0, TOPIC,
new RecordHeaders()));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
index 7b1bef67ad3..98a74df76c4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java
@@ -22,6 +22,9 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.junit.jupiter.api.Test;
@@ -579,6 +582,20 @@ public class RocksDBTimestampedStoreWithHeadersTest
extends RocksDBStoreTest {
}
}
+ @Test
+ public void shouldThrowUnsupportedOperationExceptionOnQuery() {
+ rocksDBStore.init(context, rocksDBStore);
+
+ final KeyQuery<Bytes, byte[]> query = KeyQuery.withKey(new
Bytes("test".getBytes()));
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> rocksDBStore.query(query, PositionBound.unbounded(), new
QueryConfig(false))
+ );
+
+ assertTrue(exception.getMessage().contains("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet."));
+ }
+
private byte[] wrapTimestampedValue(final byte[] value) {
// Format: [timestamp(8 bytes)][value]
// Use the numeric value as timestamp
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
new file mode 100644
index 00000000000..3929a11667a
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class TimestampedKeyValueStoreBuilderWithHeadersTest {
+
+ @Mock
+ private KeyValueBytesStoreSupplier supplier;
+ @Mock
+ private RocksDBTimestampedStoreWithHeaders inner;
+ private TimestampedKeyValueStoreBuilderWithHeaders<String, String> builder;
+
+ private void setUpWithoutInner() {
+ when(supplier.name()).thenReturn("name");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+ }
+
+ private void setUp() {
+ when(supplier.get()).thenReturn(inner);
+ setUpWithoutInner();
+ }
+
+ @Test
+ public void shouldHaveMeteredStoreAsOuterStore() {
+ setUp();
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder.build();
+ assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class,
store);
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreByDefault() {
+ setUp();
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder.build();
+ assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class,
store);
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+
assertInstanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class,
next);
+ }
+
+ @Test
+ public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+ setUp();
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder.withLoggingDisabled().build();
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+ assertEquals(next, inner);
+ }
+
+ @Test
+ public void shouldHaveCachingStoreWhenEnabled() {
+ setUp();
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder.withCachingEnabled().build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class,
store);
+ assertInstanceOf(CachingKeyValueStore.class, wrapped);
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+ setUp();
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class,
store);
+
assertInstanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class,
wrapped);
+ assertEquals(((WrappedStateStore) wrapped).wrapped(), inner);
+ }
+
+ @Test
+ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+ setUp();
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .withCachingEnabled()
+ .build();
+ final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore changeLogging = (WrappedStateStore)
caching.wrapped();
+ assertInstanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class,
store);
+ assertInstanceOf(CachingKeyValueStore.class, caching);
+
assertInstanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class,
changeLogging);
+ assertEquals(changeLogging.wrapped(), inner);
+ }
+
+ @Test
+ public void shouldNotWrapTimestampedByteStore() {
+ setUp();
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("name", "metrics-scope"));
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertInstanceOf(RocksDBTimestampedStoreWithHeaders.class,
((WrappedStateStore) store).wrapped());
+ }
+
+ @Test
+ public void shouldWrapTimestampKeyValueStoreAsHeadersStore() {
+ setUp();
+ when(supplier.get()).thenReturn(new RocksDBTimestampedStore("name",
"metrics-scope"));
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertInstanceOf(TimestampedToHeadersStoreAdapter.class,
((WrappedStateStore) store).wrapped());
+ }
+
+ @Test
+ public void shouldThrowNullPointerIfInnerIsNull() {
+ setUpWithoutInner();
+ assertThrows(NullPointerException.class, () ->
+ new TimestampedKeyValueStoreBuilderWithHeaders<>(null,
Serdes.String(), Serdes.String(), new MockTime()));
+ }
+
+ @Test
+ public void shouldNotThrowNullPointerIfKeySerdeIsNull() {
+ setUpWithoutInner();
+ // does not throw
+ new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, null,
Serdes.String(), new MockTime());
+ }
+
+ @Test
+ public void shouldNotThrowNullPointerIfValueSerdeIsNull() {
+ setUpWithoutInner();
+ // does not throw
+ new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier,
Serdes.String(), null, new MockTime());
+ }
+
+ @Test
+ public void shouldThrowNullPointerIfTimeIsNull() {
+ setUpWithoutInner();
+ assertThrows(NullPointerException.class, () ->
+ new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier,
Serdes.String(), Serdes.String(), null));
+ }
+
+ @Test
+ public void shouldThrowNullPointerIfMetricsScopeIsNull() {
+ setUpWithoutInner();
+ when(supplier.metricsScope()).thenReturn(null);
+
+ final Exception e = assertThrows(NullPointerException.class,
+ () -> new
TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, Serdes.String(),
Serdes.String(), new MockTime()));
+ assertTrue(e.getMessage().contains("storeSupplier's metricsScope can't
be null"));
+ }
+
+ @Test
+ public void shouldThrowUsingIQv2ForInMemoryStores() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final KeyQuery<String, ValueTimestampHeaders<String>> query =
+ KeyQuery.withKey("test-key");
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> wrapped.query(query, PositionBound.unbounded(), new
QueryConfig(false))
+ );
+
+ assertTrue(exception.getMessage().contains(
+ "Queries (IQv2) are not supported by timestamped key-value stores
with headers yet."));
+ }
+
+ @Test
+ public void shouldThrowWhenUsingIQv2InHeadersStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(TimestampedToHeadersStoreAdapter.class, wrapped);
+
+ final KeyQuery<String, ValueTimestampHeaders<String>> query =
+ KeyQuery.withKey("test-key");
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> wrapped.query(query, PositionBound.unbounded(), new
QueryConfig(false))
+ );
+
+ assertTrue(exception.getMessage().contains("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet."));
+ }
+
+ @Test
+ public void shouldThrowWhenPlainKeyValueStoreIsProvided() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new RocksDBStore("test-store",
"metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () ->
builder.withLoggingDisabled().withCachingDisabled().build()
+ );
+
+ assertTrue(exception.getMessage().contains("Provided store must be a
timestamped store"));
+ }
+
+ @Test
+ public void shouldThrowUsingIQv2ForNativeHeadersStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(RocksDBTimestampedStoreWithHeaders.class, wrapped);
+
+ final KeyQuery<String, ValueTimestampHeaders<String>> query =
+ KeyQuery.withKey("test-key");
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ () -> wrapped.query(query, PositionBound.unbounded(), new
QueryConfig(false))
+ );
+
+ assertTrue(exception.getMessage().contains("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet."));
+ }
+
+ @Test
+ public void shouldThrowOnGetPositionForInMemoryStores() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ store::getPosition
+ );
+
+ assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ }
+
+ @Test
+ public void shouldThrowOnGetPositionForInMemoryStoreMarker() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ // Unwrap to get directly to the
InMemoryTimestampedKeyValueStoreWithHeadersMarker
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(KeyValueStore.class, wrapped);
+ assertInstanceOf(HeadersBytesStore.class, wrapped);
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ wrapped::getPosition
+ );
+
+ assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ }
+
+ @Test
+ public void shouldThrowOnGetPositionForHeadersStoreAdapter() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStore("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ store::getPosition
+ );
+
+ assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ }
+
+ @Test
+ public void shouldThrowOnGetPositionForNativeHeadersStore() {
+ when(supplier.name()).thenReturn("test-store");
+ when(supplier.metricsScope()).thenReturn("metricScope");
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedStoreWithHeaders("test-store", "metrics-scope"));
+
+ builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime()
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+
+ final UnsupportedOperationException exception = assertThrows(
+ UnsupportedOperationException.class,
+ store::getPosition
+ );
+
+ assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
+ }
+
+}
\ No newline at end of file
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 65fcb58c24f..6ebf325bda0 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -82,8 +82,10 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -874,6 +876,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -905,6 +908,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -985,6 +989,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -1012,6 +1017,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -1035,6 +1041,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
@@ -1045,6 +1052,30 @@ public class TopologyTestDriver implements Closeable {
return store instanceof VersionedKeyValueStore ?
(VersionedKeyValueStore<K, V>) store : null;
}
+ /**
+ * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the
test case instructs the topology to
+ * {@link TestInputTopic#pipeInput(TestRecord) process an input message},
and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link
TimestampedKeyValueStoreWithHeaders} has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
+ * @see #getVersionedKeyValueStore(String)
+ * @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
+ * @see #getSessionStore(String)
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>>
getTimestampedKeyValueStoreWithHeaders(final String name) {
+ final StateStore store = getStateStore(name, false);
+ return store instanceof TimestampedKeyValueStoreWithHeaders ?
(TimestampedKeyValueStoreWithHeaders<K, V>) store : null;
+ }
+
/**
* Get the {@link WindowStore} or {@link TimestampedWindowStore} with the
given name.
* The store can be a "regular" or global store.
@@ -1064,6 +1095,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
*/
@@ -1091,6 +1123,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
* @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getSessionStore(String)
*/
@@ -1116,6 +1149,7 @@ public class TopologyTestDriver implements Closeable {
* @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> SessionStore<K, V> getSessionStore(final String name) {