This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cb7d0833ee0 KAFKA-14834: [1/N] Add timestamped get to 
KTableValueGetter (#13496)
cb7d0833ee0 is described below

commit cb7d0833ee02e190a194cc5bd28fd2b3ac31cccb
Author: Victoria Xia <[email protected]>
AuthorDate: Tue Apr 11 23:40:11 2023 -0400

    KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter (#13496)
    
    In preparation for updating DSL processors to use versioned stores (cf 
KIP-914), this PR adds two new methods to KTableValueGetter: isVersioned() and 
get(key, asOfTimestamp) and updates all existing implementations accordingly.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kstream/internals/KStreamAggregate.java        | 10 +++++++++
 .../streams/kstream/internals/KStreamReduce.java   | 10 +++++++++
 .../internals/KStreamSessionWindowAggregate.java   |  5 +++++
 .../internals/KStreamSlidingWindowAggregate.java   |  5 +++++
 .../kstream/internals/KStreamWindowAggregate.java  |  5 +++++
 .../streams/kstream/internals/KTableFilter.java    | 10 +++++++++
 .../kstream/internals/KTableKTableInnerJoin.java   | 11 ++++++++-
 .../kstream/internals/KTableKTableLeftJoin.java    |  8 +++++++
 .../kstream/internals/KTableKTableOuterJoin.java   |  8 +++++++
 .../kstream/internals/KTableKTableRightJoin.java   |  8 +++++++
 .../streams/kstream/internals/KTableMapValues.java | 10 +++++++++
 .../KTableMaterializedValueGetterSupplier.java     | 10 +++++++++
 .../kstream/internals/KTablePassThrough.java       |  9 ++++++++
 .../kstream/internals/KTableRepartitionMap.java    | 23 ++++++++++++++-----
 .../internals/KTableSourceValueGetterSupplier.java | 10 +++++++++
 .../kstream/internals/KTableTransformValues.java   | 26 ++++++++++++++++------
 .../kstream/internals/KTableValueGetter.java       | 16 +++++++++++++
 .../suppress/KTableSuppressProcessorSupplier.java  |  5 +++++
 .../state/internals/KeyValueStoreWrapper.java      | 12 ++++++++++
 ...reignJoinSubscriptionProcessorSupplierTest.java |  5 +++++
 ...scriptionResolverJoinProcessorSupplierTest.java |  5 +++++
 21 files changed, 198 insertions(+), 13 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 27a3d488918..69d346c75e1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -152,5 +152,15 @@ public class KStreamAggregate<KIn, VIn, VAgg> implements 
KStreamAggProcessorSupp
         public ValueAndTimestamp<VAgg> get(final KIn key) {
             return store.get(key);
         }
+
+        @Override
+        public ValueAndTimestamp<VAgg> get(final KIn key, final long 
asOfTimestamp) {
+            return store.get(key, asOfTimestamp);
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return store.isVersionedStore();
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index b801d2b60ea..be420579cae 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -147,6 +147,16 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, V, K,
         public ValueAndTimestamp<V> get(final K key) {
             return store.get(key);
         }
+
+        @Override
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) 
{
+            return store.get(key, asOfTimestamp);
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return store.isVersionedStore();
+        }
     }
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index f8252358b08..2a3f27c68fc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -379,5 +379,10 @@ public class KStreamSessionWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
                 store.fetchSession(key.key(), key.window().start(), 
key.window().end()),
                 key.window().end());
         }
+
+        @Override
+        public boolean isVersioned() {
+            return false;
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
index e75427d6b89..98f5e812746 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
@@ -499,5 +499,10 @@ public class KStreamSlidingWindowAggregate<KIn, VIn, VAgg> 
implements KStreamAgg
 
         @Override
         public void close() {}
+
+        @Override
+        public boolean isVersioned() {
+            return false;
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 561524f87e7..4fb4f9c00ad 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -210,5 +210,10 @@ public class KStreamWindowAggregate<KIn, VIn, VAgg, W 
extends Window> implements
             final W window = (W) windowedKey.window();
             return windowStore.fetch(key, window.start());
         }
+
+        @Override
+        public boolean isVersioned() {
+            return false;
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index 6287aa40c55..f3d6edd26e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -177,6 +177,16 @@ class KTableFilter<KIn, VIn> implements 
KTableProcessorSupplier<KIn, VIn, KIn, V
             return computeValue(key, parentGetter.get(key));
         }
 
+        @Override
+        public ValueAndTimestamp<VIn> get(final KIn key, final long 
asOfTimestamp) {
+            return computeValue(key, parentGetter.get(key, asOfTimestamp));
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return parentGetter.isVersioned();
+        }
+
         @Override
         public void close() {
             parentGetter.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index d9ac0ae8852..0f264255597 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -157,7 +157,8 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends 
KTableKTableAbstractJoin<K,
             final V1 value1 = getValueOrNull(valueAndTimestamp1);
 
             if (value1 != null) {
-                final ValueAndTimestamp<V2> valueAndTimestamp2 = 
valueGetter2.get(keyValueMapper.apply(key, value1));
+                final ValueAndTimestamp<V2> valueAndTimestamp2
+                    = valueGetter2.get(keyValueMapper.apply(key, value1));
                 final V2 value2 = getValueOrNull(valueAndTimestamp2);
 
                 if (value2 != null) {
@@ -172,6 +173,14 @@ class KTableKTableInnerJoin<K, V1, V2, VOut> extends 
KTableKTableAbstractJoin<K,
             }
         }
 
+        @Override
+        public boolean isVersioned() {
+            // even though we can derive a proper versioned result (assuming 
both parent value
+            // getters are versioned), we choose not to since the output of a 
join of two
+            // versioned tables today is not considered versioned (cf KIP-914)
+            return false;
+        }
+
         @Override
         public void close() {
             valueGetter1.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index c1baa088fe1..74971617983 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -177,6 +177,14 @@ class KTableKTableLeftJoin<K, V1, V2, VOut> extends 
KTableKTableAbstractJoin<K,
             }
         }
 
+        @Override
+        public boolean isVersioned() {
+            // even though we can derive a proper versioned result (assuming 
both parent value
+            // getters are versioned), we choose not to since the output of a 
join of two
+            // versioned tables today is not considered versioned (cf KIP-914)
+            return false;
+        }
+
         @Override
         public void close() {
             valueGetter1.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 0ee0cc449f1..654c9094625 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -185,6 +185,14 @@ class KTableKTableOuterJoin<K, V1, V2, VOut> extends 
KTableKTableAbstractJoin<K,
             return ValueAndTimestamp.make(newValue, Math.max(timestamp1, 
timestamp2));
         }
 
+        @Override
+        public boolean isVersioned() {
+            // even though we can derive a proper versioned result (assuming 
both parent value
+            // getters are versioned), we choose not to since the output of a 
join of two
+            // versioned tables today is not considered versioned (cf KIP-914)
+            return false;
+        }
+
         @Override
         public void close() {
             valueGetter1.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 61c6efa2bc3..9a70e858914 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -168,6 +168,14 @@ class KTableKTableRightJoin<K, V1, V2, VOut> extends 
KTableKTableAbstractJoin<K,
             }
         }
 
+        @Override
+        public boolean isVersioned() {
+            // even though we can derive a proper versioned result (assuming 
both parent value
+            // getters are versioned), we choose not to since the output of a 
join of two
+            // versioned tables today is not considered versioned (cf KIP-914)
+            return false;
+        }
+
         @Override
         public void close() {
             valueGetter1.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index f0897dcdb05..afc135628f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -164,6 +164,16 @@ class KTableMapValues<KIn, VIn, VOut> implements 
KTableProcessorSupplier<KIn, VI
             return computeValueAndTimestamp(key, parentGetter.get(key));
         }
 
+        @Override
+        public ValueAndTimestamp<VOut> get(final KIn key, final long 
asOfTimestamp) {
+            return computeValueAndTimestamp(key, parentGetter.get(key, 
asOfTimestamp));
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return parentGetter.isVersioned();
+        }
+
         @Override
         public void close() {
             parentGetter.close();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index ba7e65081d5..afe648ca219 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -48,5 +48,15 @@ public class KTableMaterializedValueGetterSupplier<K, V> 
implements KTableValueG
         public ValueAndTimestamp<V> get(final K key) {
             return store.get(key);
         }
+
+        @Override
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) 
{
+            return store.get(key, asOfTimestamp);
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return store.isVersionedStore();
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
index 91fe0e4a277..87fc0f49de9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTablePassThrough.java
@@ -91,5 +91,14 @@ public class KTablePassThrough<KIn, VIn> implements 
KTableProcessorSupplier<KIn,
             return store.get(key);
         }
 
+        @Override
+        public ValueAndTimestamp<VIn> get(final KIn key, final long 
asOfTimestamp) {
+            return store.get(key, asOfTimestamp);
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return store.isVersionedStore();
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 2ff826c0110..849d5bf7a0c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -172,17 +172,30 @@ public class KTableRepartitionMap<K, V, K1, V1> 
implements KTableRepartitionMapS
 
         @Override
         public ValueAndTimestamp<KeyValue<K1, V1>> get(final K key) {
-            final ValueAndTimestamp<V> valueAndTimestamp = 
parentGetter.get(key);
-            return ValueAndTimestamp.make(
-                mapper.apply(key, getValueOrNull(valueAndTimestamp)),
-                valueAndTimestamp == null ? context.timestamp() : 
valueAndTimestamp.timestamp()
-            );
+            return mapValue(key, parentGetter.get(key));
+        }
+
+        @Override
+        public ValueAndTimestamp<KeyValue<K1, V1>> get(final K key, final long 
asOfTimestamp) {
+            return mapValue(key, parentGetter.get(key, asOfTimestamp));
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return parentGetter.isVersioned();
         }
 
         @Override
         public void close() {
             parentGetter.close();
         }
+
+        private ValueAndTimestamp<KeyValue<K1, V1>> mapValue(final K key, 
final ValueAndTimestamp<V> valueAndTimestamp) {
+            return ValueAndTimestamp.make(
+                mapper.apply(key, getValueOrNull(valueAndTimestamp)),
+                valueAndTimestamp == null ? context.timestamp() : 
valueAndTimestamp.timestamp()
+            );
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 1c340943354..8599a50f60b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -47,5 +47,15 @@ public class KTableSourceValueGetterSupplier<K, V> 
implements KTableValueGetterS
         public ValueAndTimestamp<V> get(final K key) {
             return store.get(key);
         }
+
+        @Override
+        public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) 
{
+            return store.get(key, asOfTimestamp);
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return store.isVersionedStore();
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
index 5975571f0fa..ba1705b033b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -153,8 +153,26 @@ class KTableTransformValues<K, V, VOut> implements 
KTableProcessorSupplier<K, V,
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V> valueAndTimestamp = 
parentGetter.get(key);
+            return transformValue(key, parentGetter.get(key));
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long 
asOfTimestamp) {
+            return transformValue(key, parentGetter.get(key, asOfTimestamp));
+        }
+
+        @Override
+        public boolean isVersioned() {
+            return parentGetter.isVersioned();
+        }
 
+        @Override
+        public void close() {
+            parentGetter.close();
+            valueTransformer.close();
+        }
+
+        private ValueAndTimestamp<VOut> transformValue(final K key, final 
ValueAndTimestamp<V> valueAndTimestamp) {
             final ProcessorRecordContext currentContext = 
internalProcessorContext.recordContext();
 
             internalProcessorContext.setRecordContext(new 
ProcessorRecordContext(
@@ -177,11 +195,5 @@ class KTableTransformValues<K, V, VOut> implements 
KTableProcessorSupplier<K, V,
 
             return result;
         }
-
-        @Override
-        public void close() {
-            parentGetter.close();
-            valueTransformer.close();
-        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index 6303b3d013c..98705cf3407 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -25,5 +25,21 @@ public interface KTableValueGetter<K, V> {
 
     ValueAndTimestamp<V> get(K key);
 
+    /**
+     * Returns the latest record version, associated with the provided key, 
with timestamp
+     * not exceeding the provided timestamp bound. This method may only be 
called if
+     * {@link #isVersioned()} is true.
+     */
+    default ValueAndTimestamp<V> get(K key, long asOfTimestamp) {
+        throw new UnsupportedOperationException("get(key, timestamp) is only 
supported for versioned stores");
+    }
+
+    /**
+     * @return whether this value getter supports multiple record versions for 
the same key.
+     *         If true, then {@link #get(Object, long)} must be implemented. 
If not, then
+     *         {@link #get(Object, long)} must not be called.
+     */
+    boolean isVersioned();
+
     default void close() {}
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index 96c103a39a4..ea635c3ebc2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -89,6 +89,11 @@ public class KTableSuppressProcessorSupplier<K, V> implements
                         }
                     }
 
+                    @Override
+                    public boolean isVersioned() {
+                        return false;
+                    }
+
                     @Override
                     public void close() {
                         // the main processor is responsible for the buffer's 
lifecycle
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
index 9c2920570aa..9fb2dc2123f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java
@@ -81,6 +81,14 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
         throw new IllegalStateException("KeyValueStoreWrapper must be 
initialized with either timestamped or versioned store");
     }
 
+    public ValueAndTimestamp<V> get(final K key, final long asOfTimestamp) {
+        if (!isVersionedStore()) {
+            throw new UnsupportedOperationException("get(key, timestamp) is 
only supported for versioned stores");
+        }
+        final VersionedRecord<V> versionedRecord = versionedStore.get(key, 
asOfTimestamp);
+        return versionedRecord == null ? null : 
ValueAndTimestamp.make(versionedRecord.value(), versionedRecord.timestamp());
+    }
+
     public void put(final K key, final V value, final long timestamp) {
         if (timestampedStore != null) {
             timestampedStore.put(key, ValueAndTimestamp.make(value, 
timestamp));
@@ -97,6 +105,10 @@ public class KeyValueStoreWrapper<K, V> implements 
StateStore {
         return store;
     }
 
+    public boolean isVersionedStore() {
+        return versionedStore != null;
+    }
+
     @Override
     public String name() {
         return store.name();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
index 1bf708b8ab5..6757ccca20b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionProcessorSupplierTest.java
@@ -352,6 +352,11 @@ public class ForeignJoinSubscriptionProcessorSupplierTest {
             public void init(final ProcessorContext context) {
 
             }
+
+            @Override
+            public boolean isVersioned() {
+                return false;
+            }
         };
         return new KTableValueGetterSupplier<String, String>() {
             @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
index dd794b0107d..afd5f490db2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
@@ -56,6 +56,11 @@ public class SubscriptionResolverJoinProcessorSupplierTest {
                 public ValueAndTimestamp<V> get(final K key) {
                     return ValueAndTimestamp.make(map.get(key), -1);
                 }
+
+                @Override
+                public boolean isVersioned() {
+                    return false;
+                }
             };
         }
 

Reply via email to