This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cb7d0833ee0 KAFKA-14834: [1/N] Add timestamped get to
KTableValueGetter (#13496)
cb7d0833ee0 is described below
commit cb7d0833ee02e190a194cc5bd28fd2b3ac31cccb
Author: Victoria Xia <[email protected]>
AuthorDate: Tue Apr 11 23:40:11 2023 -0400
KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter (#13496)
In preparation for updating DSL processors to use versioned stores (cf
KIP-914), this PR adds two new methods to KTableValueGetter: isVersioned() and
get(key, asOfTimestamp) and updates all existing implementations accordingly.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kstream/internals/KStreamAggregate.java | 10 +++++++++
.../streams/kstream/internals/KStreamReduce.java | 10 +++++++++
.../internals/KStreamSessionWindowAggregate.java | 5 +++++
.../internals/KStreamSlidingWindowAggregate.java | 5 +++++
.../kstream/internals/KStreamWindowAggregate.java | 5 +++++
.../streams/kstream/internals/KTableFilter.java | 10 +++++++++
.../kstream/internals/KTableKTableInnerJoin.java | 11 ++++++++-
.../kstream/internals/KTableKTableLeftJoin.java | 8 +++++++
.../kstream/internals/KTableKTableOuterJoin.java | 8 +++++++
.../kstream/internals/KTableKTableRightJoin.java | 8 +++++++
.../streams/kstream/internals/KTableMapValues.java | 10 +++++++++
.../KTableMaterializedValueGetterSupplier.java | 10 +++++++++
.../kstream/internals/KTablePassThrough.java | 9 ++++++++
.../kstream/internals/KTableRepartitionMap.java | 23 ++++++++++++++-----
.../internals/KTableSourceValueGetterSupplier.java | 10 +++++++++
.../kstream/internals/KTableTransformValues.java | 26 ++++++++++++++++------
.../kstream/internals/KTableValueGetter.java | 16 +++++++++++++
.../suppress/KTableSuppressProcessorSupplier.java | 5 +++++
.../state/internals/KeyValueStoreWrapper.java | 12 ++++++++++
...reignJoinSubscriptionProcessorSupplierTest.java | 5 +++++
...scriptionResolverJoinProcessorSupplierTest.java | 5 +++++
21 files changed, 198 insertions(+), 13 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 27a3d488918..69d346c75e1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -152,5 +152,15 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements
KStreamAggProcessorSupp
public ValueAndTimestamp<VAgg> get(final KIn key) {
return store.get(key);
}
+
+ @Override
+ public ValueAndTimestamp<VAgg> get(final KIn key, final long
asOfTimestamp) {
+ return store.get(key, asOfTimestamp);
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return store.isVersionedStore();
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index b801d2b60ea..be420579cae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -147,6 +147,16 @@ public class KStreamReduce<K, V> implements
KStreamAggProcessorSupplier<K, V, K,
public ValueAndTimestamp<V> get(final K key) {
return store.get(key);
}
+
+ @Override
+ public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp)
{
+ return store.get(key, asOfTimestamp);
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return store.isVersionedStore();
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index f8252358b08..2a3f27c68fc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -379,5 +379,10 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
store.fetchSession(key.key(), key.window().start(),
key.window().end()),
key.window().end());
}
+
+ @Override
+ public boolean isVersioned() {
+ return false;
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index e75427d6b89..98f5e812746 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -499,5 +499,10 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg>
implements KStreamAgg
@Override
public void close() {}
+
+ @Override
+ public boolean isVersioned() {
+ return false;
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 561524f87e7..4fb4f9c00ad 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -210,5 +210,10 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W
extends Window> implements
final W window = (W) windowedKey.window();
return windowStore.fetch(key, window.start());
}
+
+ @Override
+ public boolean isVersioned() {
+ return false;
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 6287aa40c55..f3d6edd26e0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -177,6 +177,16 @@ class KTableFilter<KIn, VIn> implements
KTableProcessorSupplier<KIn, VIn, KIn, V
return computeValue(key, parentGetter.get(key));
}
+ @Override
+ public ValueAndTimestamp<VIn> get(final KIn key, final long
asOfTimestamp) {
+ return computeValue(key, parentGetter.get(key, asOfTimestamp));
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return parentGetter.isVersioned();
+ }
+
@Override
public void close() {
parentGetter.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index d9ac0ae8852..0f264255597 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -157,7 +157,8 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends
KTableKTableAbstractJoin<K,
final V1 value1 = getValueOrNull(valueAndTimestamp1);
if (value1 != null) {
- final ValueAndTimestamp<V2> valueAndTimestamp2 =
valueGetter2.get(keyValueMapper.apply(key, value1));
+ final ValueAndTimestamp<V2> valueAndTimestamp2
+ = valueGetter2.get(keyValueMapper.apply(key, value1));
final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (value2 != null) {
@@ -172,6 +173,14 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends
KTableKTableAbstractJoin<K,
}
}
+ @Override
+ public boolean isVersioned() {
+ // even though we can derive a proper versioned result (assuming
both parent value
+ // getters are versioned), we choose not to since the output of a
join of two
+ // versioned tables today is not considered versioned (cf KIP-914)
+ return false;
+ }
+
@Override
public void close() {
valueGetter1.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index c1baa088fe1..74971617983 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -177,6 +177,14 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends
KTableKTableAbstractJoin<K,
}
}
+ @Override
+ public boolean isVersioned() {
+ // even though we can derive a proper versioned result (assuming
both parent value
+ // getters are versioned), we choose not to since the output of a
join of two
+ // versioned tables today is not considered versioned (cf KIP-914)
+ return false;
+ }
+
@Override
public void close() {
valueGetter1.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 0ee0cc449f1..654c9094625 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -185,6 +185,14 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends
KTableKTableAbstractJoin<K,
return ValueAndTimestamp.make(newValue, Math.max(timestamp1,
timestamp2));
}
+ @Override
+ public boolean isVersioned() {
+ // even though we can derive a proper versioned result (assuming
both parent value
+ // getters are versioned), we choose not to since the output of a
join of two
+ // versioned tables today is not considered versioned (cf KIP-914)
+ return false;
+ }
+
@Override
public void close() {
valueGetter1.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 61c6efa2bc3..9a70e858914 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -168,6 +168,14 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends
KTableKTableAbstractJoin<K,
}
}
+ @Override
+ public boolean isVersioned() {
+ // even though we can derive a proper versioned result (assuming
both parent value
+ // getters are versioned), we choose not to since the output of a
join of two
+ // versioned tables today is not considered versioned (cf KIP-914)
+ return false;
+ }
+
@Override
public void close() {
valueGetter1.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index f0897dcdb05..afc135628f0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -164,6 +164,16 @@ class KTableMapValues<KIn, VIn, VOut> implements
KTableProcessorSupplier<KIn, VI
return computeValueAndTimestamp(key, parentGetter.get(key));
}
+ @Override
+ public ValueAndTimestamp<VOut> get(final KIn key, final long
asOfTimestamp) {
+ return computeValueAndTimestamp(key, parentGetter.get(key,
asOfTimestamp));
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return parentGetter.isVersioned();
+ }
+
@Override
public void close() {
parentGetter.close();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index ba7e65081d5..afe648ca219 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -48,5 +48,15 @@ public class KTableMaterializedValueGetterSupplier<K, V>
implements KTableValueG
public ValueAndTimestamp<V> get(final K key) {
return store.get(key);
}
+
+ @Override
+ public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp)
{
+ return store.get(key, asOfTimestamp);
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return store.isVersionedStore();
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
index 91fe0e4a277..87fc0f49de9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
@@ -91,5 +91,14 @@ public class KTablePassThrough<KIn, VIn> implements
KTableProcessorSupplier<KIn,
return store.get(key);
}
+ @Override
+ public ValueAndTimestamp<VIn> get(final KIn key, final long
asOfTimestamp) {
+ return store.get(key, asOfTimestamp);
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return store.isVersionedStore();
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 2ff826c0110..849d5bf7a0c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -172,17 +172,30 @@ public class KTableRepartitionMap<K, V, K1, V1>
implements KTableRepartitionMapS
@Override
public ValueAndTimestamp<KeyValue<K1, V1>> get(final K key) {
- final ValueAndTimestamp<V> valueAndTimestamp =
parentGetter.get(key);
- return ValueAndTimestamp.make(
- mapper.apply(key, getValueOrNull(valueAndTimestamp)),
- valueAndTimestamp == null ? context.timestamp() :
valueAndTimestamp.timestamp()
- );
+ return mapValue(key, parentGetter.get(key));
+ }
+
+ @Override
+ public ValueAndTimestamp<KeyValue<K1, V1>> get(final K key, final long
asOfTimestamp) {
+ return mapValue(key, parentGetter.get(key, asOfTimestamp));
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return parentGetter.isVersioned();
}
@Override
public void close() {
parentGetter.close();
}
+
+ private ValueAndTimestamp<KeyValue<K1, V1>> mapValue(final K key,
final ValueAndTimestamp<V> valueAndTimestamp) {
+ return ValueAndTimestamp.make(
+ mapper.apply(key, getValueOrNull(valueAndTimestamp)),
+ valueAndTimestamp == null ? context.timestamp() :
valueAndTimestamp.timestamp()
+ );
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 1c340943354..8599a50f60b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -47,5 +47,15 @@ public class KTableSourceValueGetterSupplier<K, V>
implements KTableValueGetterS
public ValueAndTimestamp<V> get(final K key) {
return store.get(key);
}
+
+ @Override
+ public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp)
{
+ return store.get(key, asOfTimestamp);
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return store.isVersionedStore();
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 5975571f0fa..ba1705b033b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -153,8 +153,26 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
@Override
public ValueAndTimestamp<VOut> get(final K key) {
- final ValueAndTimestamp<V> valueAndTimestamp =
parentGetter.get(key);
+ return transformValue(key, parentGetter.get(key));
+ }
+
+ @Override
+ public ValueAndTimestamp<VOut> get(final K key, final long
asOfTimestamp) {
+ return transformValue(key, parentGetter.get(key, asOfTimestamp));
+ }
+
+ @Override
+ public boolean isVersioned() {
+ return parentGetter.isVersioned();
+ }
+ @Override
+ public void close() {
+ parentGetter.close();
+ valueTransformer.close();
+ }
+
+ private ValueAndTimestamp<VOut> transformValue(final K key, final
ValueAndTimestamp<V> valueAndTimestamp) {
final ProcessorRecordContext currentContext =
internalProcessorContext.recordContext();
internalProcessorContext.setRecordContext(new
ProcessorRecordContext(
@@ -177,11 +195,5 @@ class KTableTransformValues<K, V, VOut> implements
KTableProcessorSupplier<K, V,
return result;
}
-
- @Override
- public void close() {
- parentGetter.close();
- valueTransformer.close();
- }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index 6303b3d013c..98705cf3407 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -25,5 +25,21 @@ public interface KTableValueGetter<K, V> {
ValueAndTimestamp<V> get(K key);
+ /**
+ * Returns the latest record version, associated with the provided key,
with timestamp
+ * not exceeding the provided timestamp bound. This method may only be
called if
+ * {@link #isVersioned()} is true.
+ */
+ default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
+ throw new UnsupportedOperationException("get(key, timestamp) is only
supported for versioned stores");
+ }
+
+ /**
+ * @return whether this value getter supports multiple record versions for
the same key.
+ * If true, then {@link #get(Object, long)} must be implemented.
If not, then
+ * {@link #get(Object, long)} must not be called.
+ */
+ boolean isVersioned();
+
default void close() {}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index 96c103a39a4..ea635c3ebc2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -89,6 +89,11 @@ public class KTableSuppressProcessorSupplier<K, V> implements
}
}
+ @Override
+ public boolean isVersioned() {
+ return false;
+ }
+
@Override
public void close() {
// the main processor is responsible for the buffer's
lifecycle
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
index 9c2920570aa..9fb2dc2123f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -81,6 +81,14 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
throw new IllegalStateException("KeyValueStoreWrapper must be
initialized with either timestamped or versioned store");
}
+ public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+ if (!isVersionedStore()) {
+ throw new UnsupportedOperationException("get(key, timestamp) is
only supported for versioned stores");
+ }
+ final VersionedRecord<V> versionedRecord = versionedStore.get(key,
asOfTimestamp);
+ return versionedRecord == null ? null :
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp());
+ }
+
public void put(final K key, final V value, final long timestamp) {
if (timestampedStore != null) {
timestampedStore.put(key, ValueAndTimestamp.make(value,
timestamp));
@@ -97,6 +105,10 @@ public class KeyValueStoreWrapper<K, V> implements
StateStore {
return store;
}
+ public boolean isVersionedStore() {
+ return versionedStore != null;
+ }
+
@Override
public String name() {
return store.name();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
index 1bf708b8ab5..6757ccca20b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
@@ -352,6 +352,11 @@ public class ForeignJoinSubscriptionProcessorSupplierTest {
public void init(final ProcessorContext context) {
}
+
+ @Override
+ public boolean isVersioned() {
+ return false;
+ }
};
return new KTableValueGetterSupplier<String, String>() {
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index dd794b0107d..afd5f490db2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -56,6 +56,11 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
public ValueAndTimestamp<V> get(final K key) {
return ValueAndTimestamp.make(map.get(key), -1);
}
+
+ @Override
+ public boolean isVersioned() {
+ return false;
+ }
};
}