This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aaccf542d12 KAFKA-16141: Fix StreamsStandbyTask system test (#15217)
aaccf542d12 is described below
commit aaccf542d126a6415988ee0fa53ec87c715eb622
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Jan 19 09:23:42 2024 -0800
KAFKA-16141: Fix StreamsStandbyTask system test (#15217)
KAFKA-15629 added `TimestampedByteStore` interface to
`KeyValueToTimestampedKeyValueByteStoreAdapter` which break the restore
code path and thus some system tests.
This PR reverts this change for now.
Reviewers: Almog Gavra <[email protected]>, Walker Carlson
<[email protected]>
---
.../kafka/streams/state/internals/CachingKeyValueStore.java | 2 +-
.../KeyValueToTimestampedKeyValueByteStoreAdapter.java | 3 +--
.../kafka/streams/state/internals/StoreQueryUtils.java | 12 +++++++++++-
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 8ba3052a9f4..36f08412828 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -187,7 +187,7 @@ public class CachingKeyValueStore
final LRUCacheEntry lruCacheEntry = context.cache().get(cacheName,
key);
if (lruCacheEntry != null) {
final byte[] rawValue;
- if (timestampedSchema &&
!WrappedStateStore.isTimestamped(wrapped())) {
+ if (timestampedSchema &&
!WrappedStateStore.isTimestamped(wrapped()) &&
!StoreQueryUtils.isAdapter(wrapped())) {
rawValue =
ValueAndTimestampDeserializer.rawValue(lruCacheEntry.value());
} else {
rawValue = lruCacheEntry.value();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
index fb09b2b6be3..a467c4db31c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java
@@ -35,7 +35,6 @@ import
org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.TimestampedBytesStore;
import java.util.List;
@@ -54,7 +53,7 @@ import static
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserial
*/
@SuppressWarnings("unchecked")
-public class KeyValueToTimestampedKeyValueByteStoreAdapter implements
KeyValueStore<Bytes, byte[]>, TimestampedBytesStore {
+public class KeyValueToTimestampedKeyValueByteStoreAdapter implements
KeyValueStore<Bytes, byte[]> {
final KeyValueStore<Bytes, byte[]> store;
KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore<Bytes,
byte[]> store) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 255f597600a..1609e8b2c5d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -410,7 +410,7 @@ public final class StoreQueryUtils {
@SuppressWarnings({"unchecked", "rawtypes"})
public static <V> Function<byte[], V> getDeserializeValue(final
StateSerdes<?, V> serdes, final StateStore wrapped) {
final Serde<V> valueSerde = serdes.valueSerde();
- final boolean timestamped = WrappedStateStore.isTimestamped(wrapped);
+ final boolean timestamped = WrappedStateStore.isTimestamped(wrapped)
|| isAdapter(wrapped);
final Deserializer<V> deserializer;
if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {
final ValueAndTimestampDeserializer valueAndTimestampDeserializer =
@@ -422,6 +422,16 @@ public final class StoreQueryUtils {
return byteArray -> deserializer.deserialize(serdes.topic(),
byteArray);
}
+ public static boolean isAdapter(final StateStore stateStore) {
+ if (stateStore instanceof
KeyValueToTimestampedKeyValueByteStoreAdapter) {
+ return true;
+ } else if (stateStore instanceof WrappedStateStore) {
+ return isAdapter(((WrappedStateStore) stateStore).wrapped());
+ } else {
+ return false;
+ }
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>>
getDeserializeValue(final StateSerdes<?, V> serdes) {
final Serde<V> valueSerde = serdes.valueSerde();