This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab5a07abadf KAFKA-20218: Add downgrade tests in
TimestampedKeyValueStoreWithHeadersTest (#21667)
ab5a07abadf is described below
commit ab5a07abadf0ee29faece540862792a6368ff13d
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Mar 9 22:48:56 2026 +0100
KAFKA-20218: Add downgrade tests in TimestampedKeyValueStoreWithHeadersTest
(#21667)
Adding two integration tests to verify whether downgrading from header
key value store to timestamped key value store is possible. Directly
downgrading is not supported; the only option is to delete the store
entirely and restore it from the changelog.
Reviewers: Matthias J. Sax <[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 126 +++++++++++++++++++++
1 file changed, 126 insertions(+)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 2976f0713fe..a9a3c9a9e32 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -269,6 +269,34 @@ public class HeadersStoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
+ private <K, V> void verifyLegacyTimestampedValue(final K key,
+ final V value,
+ final long timestamp)
+ throws Exception {
+
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store
=
+ IntegrationTestUtils.getStore(STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final ValueAndTimestamp<V> result = store.get(key);
+ return result != null && result.value().equals(value) &&
result.timestamp() == timestamp;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
+ }
+
+
private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final
K key,
final
V value,
final
long timestamp,
@@ -653,6 +681,82 @@ public class HeadersStoreUpgradeIntegrationTest {
}
}
+ @Test
+ public void
shouldFailDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStore()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateKeyValueStoreWithHeaders(props);
+ kafkaStreams = null;
+
+ // Attempt to downgrade to non-headers key-value store
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ boolean exceptionThrown = false;
+ try {
+ kafkaStreams.start();
+ } catch (final Exception e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof ProcessorStateException &&
+ cause.getMessage() != null &&
+ cause.getMessage().contains("headers-aware") &&
+ cause.getMessage().contains("Downgrade")) {
+ exceptionThrown = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException
about downgrade not being supported, but got: " + e.getMessage(), e);
+ }
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException to be
thrown when attempting to downgrade from headers-aware to non-headers key-value
store");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStoreAfterCleanup()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateKeyValueStoreWithHeaders(props);
+
+ kafkaStreams.cleanUp(); // Delete local state
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+ kafkaStreams.start();
+
+ // verify legacy key, values
+ verifyLegacyTimestampedValue("key1", "value1", 11L);
+ verifyLegacyTimestampedValue("key2", "value2", 22L);
+
+ processKeyValueAndVerifyTimestampedValue("key3", "value3", 333L);
+ processKeyValueAndVerifyTimestampedValue("key4", "value4", 444L);
+
+ kafkaStreams.close();
+ }
+
@Test
public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
final Properties props = props();
@@ -809,6 +913,28 @@ public class HeadersStoreUpgradeIntegrationTest {
return CLUSTER.time.milliseconds();
}
+ private void setupAndPopulateKeyValueStoreWithHeaders(final Properties
props) throws Exception {
+ final StreamsBuilder headersBuilder = new StreamsBuilder();
+ headersBuilder.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+ kafkaStreams.start();
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ processKeyValueWithTimestampAndHeadersAndVerify("key1", "value1", 11L,
headers, headers);
+ processKeyValueWithTimestampAndHeadersAndVerify("key2", "value2", 22L,
headers, headers);
+
+ kafkaStreams.close();
+ }
+
private void produceRecordWithHeaders(final String key, final String
value, final long timestamp) throws Exception {
final Headers headers = new RecordHeaders();
headers.add("source", "test".getBytes());