This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c76fb5cb9b2 KAFKA-17893: Support record keys in the 
foreignKeyExtractor argument of KTable foreign join (#17756)
c76fb5cb9b2 is described below

commit c76fb5cb9b284e5b5f8b642c7b740ed004f32bb0
Author: Peter Lee <[email protected]>
AuthorDate: Wed Dec 4 00:34:13 2024 +0800

    KAFKA-17893: Support record keys in the foreignKeyExtractor argument of 
KTable foreign join (#17756)
    
    Currently, KTable foreign key joins only allow extracting the foreign key 
from the value of the source record. This forces users to duplicate data that 
might already exist in the key into the value when the foreign key needs to be 
derived from both the key and value. This leads to:
    
    - Data duplication
    - Additional storage overhead
    - Potential data inconsistency if the duplicated data gets out of sync
    - Less intuitive API when the foreign key is naturally derived from both 
key and value
    
    This change allows user to extract the foreign key from the key and value 
of the source record.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../org/apache/kafka/streams/kstream/KTable.java   | 175 ++++++++++++
 .../streams/kstream/internals/KTableImpl.java      | 141 +++++++++-
 .../foreignkeyjoin/ForeignKeyExtractor.java        |  48 ++++
 .../SubscriptionSendProcessorSupplier.java         |  17 +-
 .../KTableKTableForeignKeyJoinScenarioTest.java    | 132 +++++++++
 .../SubscriptionSendProcessorSupplierTest.java     | 304 ++++++++++++++++++++-
 .../kafka/streams/scala/kstream/KTable.scala       |  87 ++++++
 .../kafka/streams/scala/kstream/KTableTest.scala   |  80 ++++++
 8 files changed, 964 insertions(+), 20 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index ed599d0e902..1c8fb3fea39 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -31,6 +31,7 @@ import 
org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
+import java.util.function.BiFunction;
 import java.util.function.Function;
 
 /**
@@ -2111,6 +2112,24 @@ public interface KTable<K, V> {
                                     final Function<V, KO> foreignKeyExtractor,
                                     final ValueJoiner<V, VO, VR> joiner);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed inner join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            result is null, the update is ignored as 
invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table 
with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner);
+
     /**
      * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed inner join,
      * using the {@link TableJoined} instance for optional configurations 
including
@@ -2134,6 +2153,28 @@ public interface KTable<K, V> {
                                     final ValueJoiner<V, VO, VR> joiner,
                                     final TableJoined<K, KO> tableJoined);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed inner join,
+     * using the {@link TableJoined} instance for optional configurations 
including
+     * {@link StreamPartitioner partitioners} when the tables being joined use 
non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            result is null, the update is ignored as 
invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure 
partitioners and names of internal topics and stores
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table 
with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner,
+                                    final TableJoined<K, KO> tableJoined);
     /**
      * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed inner join.
      * <p>
@@ -2155,6 +2196,27 @@ public interface KTable<K, V> {
                                     final ValueJoiner<V, VO, VR> joiner,
                                     final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed inner join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            result is null, the update is ignored as 
invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param materialized        a {@link Materialized} that describes how 
the {@link StateStore} for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code 
null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table 
with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner,
+                                    final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
+
     /**
      * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed inner join,
      * using the {@link TableJoined} instance for optional configurations 
including
@@ -2181,6 +2243,32 @@ public interface KTable<K, V> {
                                     final TableJoined<K, KO> tableJoined,
                                     final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed inner join,
+     * using the {@link TableJoined} instance for optional configurations 
including
+     * {@link StreamPartitioner partitioners} when the tables being joined use 
non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            result is null, the update is ignored as 
invalid.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure 
partitioners and names of internal topics and stores
+     * @param materialized        a {@link Materialized} that describes how 
the {@link StateStore} for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code 
null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table 
with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                    final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                    final ValueJoiner<V, VO, VR> joiner,
+                                    final TableJoined<K, KO> tableJoined,
+                                    final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
+
     /**
      * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join.
      * <p>
@@ -2199,6 +2287,24 @@ public interface KTable<K, V> {
                                         final Function<V, KO> 
foreignKeyExtractor,
                                         final ValueJoiner<V, VO, VR> joiner);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains only those records that satisfy 
the given predicate
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner);
+
     /**
      * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join,
      * using the {@link TableJoined} instance for optional configurations 
including
@@ -2221,6 +2327,28 @@ public interface KTable<K, V> {
                                         final ValueJoiner<V, VO, VR> joiner,
                                         final TableJoined<K, KO> tableJoined);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join,
+     * using the {@link TableJoined} instance for optional configurations 
including
+     * {@link StreamPartitioner partitioners} when the tables being joined use 
non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure 
partitioners and names of internal topics and stores
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table 
with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner,
+                                        final TableJoined<K, KO> tableJoined);
+
     /**
      * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join.
      * <p>
@@ -2242,6 +2370,27 @@ public interface KTable<K, V> {
                                         final ValueJoiner<V, VO, VR> joiner,
                                         final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param materialized        a {@link Materialized} that describes how 
the {@link StateStore} for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code 
null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table 
with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner,
+                                        final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
+
     /**
      * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join,
      * using the {@link TableJoined} instance for optional configurations 
including
@@ -2268,6 +2417,32 @@ public interface KTable<K, V> {
                                         final TableJoined<K, KO> tableJoined,
                                         final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
 
+    /**
+     * Join records of this {@code KTable} with another {@code KTable} using 
non-windowed left join,
+     * using the {@link TableJoined} instance for optional configurations 
including
+     * {@link StreamPartitioner partitioners} when the tables being joined use 
non-default partitioning,
+     * and also the base name for components of the join.
+     * <p>
+     * This is a foreign key join, where the joining key is determined by the 
{@code foreignKeyExtractor}.
+     *
+     * @param other               the other {@code KTable} to be joined with 
this {@code KTable}. Keyed by KO.
+     * @param foreignKeyExtractor a {@link BiFunction} that extracts the key 
(KO) from this table's key and value (K, V). If the
+     *                            extract is null, then the right hand side of 
the result will be null.
+     * @param joiner              a {@link ValueJoiner} that computes the join 
result for a pair of matching records
+     * @param tableJoined         a {@link TableJoined} used to configure 
partitioners and names of internal topics and stores
+     * @param materialized        a {@link Materialized} that describes how 
the {@link StateStore} for the resulting {@code KTable}
+     *                            should be materialized. Cannot be {@code 
null}
+     * @param <VR>                the value type of the result {@code KTable}
+     * @param <KO>                the key type of the other {@code KTable}
+     * @param <VO>                the value type of the other {@code KTable}
+     * @return a {@code KTable} that contains the result of joining this table 
with {@code other}
+     */
+    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                        final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                        final ValueJoiner<V, VO, VR> joiner,
+                                        final TableJoined<K, KO> tableJoined,
+                                        final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized);
+
     /**
      * Get the name of the local state store used that can be used to query 
this {@code KTable}.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 2c75167f019..a0d5ff4d7c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.ValueMapperWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
 import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKeySchema;
+import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignKeyExtractor;
 import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
 import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplier;
 import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionJoinProcessorSupplier;
@@ -86,6 +87,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -906,9 +908,25 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
     public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                            final Function<V, KO> 
foreignKeyExtractor,
                                            final ValueJoiner<V, VO, VR> 
joiner) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
         return doJoinOnForeignKey(
             other,
-            foreignKeyExtractor,
+            adaptedExtractor,
+            joiner,
+            TableJoined.with(null, null),
+            Materialized.with(null, null),
+            false
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> 
joiner) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(
+            other,
+            adaptedExtractor,
             joiner,
             TableJoined.with(null, null),
             Materialized.with(null, null),
@@ -921,9 +939,26 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
                                            final Function<V, KO> 
foreignKeyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final TableJoined<K, KO> 
tableJoined) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
         return doJoinOnForeignKey(
             other,
-            foreignKeyExtractor,
+            adaptedExtractor,
+            joiner,
+            tableJoined,
+            Materialized.with(null, null),
+            false
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> joiner,
+                                           final TableJoined<K, KO> 
tableJoined) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(
+            other,
+            adaptedExtractor,
             joiner,
             tableJoined,
             Materialized.with(null, null),
@@ -936,7 +971,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
                                            final Function<V, KO> 
foreignKeyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, 
TableJoined.with(null, null), materialized, false);
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(other, adaptedExtractor, joiner, 
TableJoined.with(null, null), materialized, false);
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> joiner,
+                                           final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(other, adaptedExtractor, joiner, 
TableJoined.with(null, null), materialized, false);
     }
 
     @Override
@@ -945,9 +990,27 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final TableJoined<K, KO> 
tableJoined,
                                            final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(
+            other,
+            adaptedExtractor,
+            joiner,
+            tableJoined,
+            materialized,
+            false
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+                                           final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                           final ValueJoiner<V, VO, VR> joiner,
+                                           final TableJoined<K, KO> 
tableJoined,
+                                           final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
         return doJoinOnForeignKey(
             other,
-            foreignKeyExtractor,
+            adaptedExtractor,
             joiner,
             tableJoined,
             materialized,
@@ -959,9 +1022,25 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
     public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                                final Function<V, KO> 
foreignKeyExtractor,
                                                final ValueJoiner<V, VO, VR> 
joiner) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
         return doJoinOnForeignKey(
             other,
-            foreignKeyExtractor,
+            adaptedExtractor,
+            joiner,
+            TableJoined.with(null, null),
+            Materialized.with(null, null),
+            true
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> 
joiner) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(
+            other,
+            adaptedExtractor,
             joiner,
             TableJoined.with(null, null),
             Materialized.with(null, null),
@@ -974,9 +1053,26 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
                                                final Function<V, KO> 
foreignKeyExtractor,
                                                final ValueJoiner<V, VO, VR> 
joiner,
                                                final TableJoined<K, KO> 
tableJoined) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(
+            other,
+            adaptedExtractor,
+            joiner,
+            tableJoined,
+            Materialized.with(null, null),
+            true
+        );
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> 
joiner,
+                                               final TableJoined<K, KO> 
tableJoined) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
         return doJoinOnForeignKey(
             other,
-            foreignKeyExtractor,
+            adaptedExtractor,
             joiner,
             tableJoined,
             Materialized.with(null, null),
@@ -990,9 +1086,26 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
                                                final ValueJoiner<V, VO, VR> 
joiner,
                                                final TableJoined<K, KO> 
tableJoined,
                                                final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(
+            other,
+            adaptedExtractor,
+            joiner,
+            tableJoined,
+            materialized,
+            true);
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> 
joiner,
+                                               final TableJoined<K, KO> 
tableJoined,
+                                               final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
         return doJoinOnForeignKey(
             other,
-            foreignKeyExtractor,
+            adaptedExtractor,
             joiner,
             tableJoined,
             materialized,
@@ -1004,7 +1117,17 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
                                                final Function<V, KO> 
foreignKeyExtractor,
                                                final ValueJoiner<V, VO, VR> 
joiner,
                                                final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, 
TableJoined.with(null, null), materialized, true);
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(other, adaptedExtractor, joiner, 
TableJoined.with(null, null), materialized, true);
+    }
+
+    @Override
+    public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+                                               final BiFunction<K, V, KO> 
foreignKeyExtractor,
+                                               final ValueJoiner<V, VO, VR> 
joiner,
+                                               final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        final ForeignKeyExtractor<K, V, KO> adaptedExtractor = 
ForeignKeyExtractor.fromBiFunction(foreignKeyExtractor);
+        return doJoinOnForeignKey(other, adaptedExtractor, joiner, 
TableJoined.with(null, null), materialized, true);
     }
 
     private final Function<Optional<Set<Integer>>, Optional<Set<Integer>>> 
getPartition = maybeMulticastPartitions -> {
@@ -1020,7 +1143,7 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
 
     @SuppressWarnings({"unchecked", "deprecation"})
     private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> 
foreignKeyTable,
-                                                          final Function<V, 
KO> foreignKeyExtractor,
+                                                          final 
ForeignKeyExtractor<K, V, KO> foreignKeyExtractor,
                                                           final ValueJoiner<V, 
VO, VR> joiner,
                                                           final TableJoined<K, 
KO> tableJoined,
                                                           final 
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java
new file mode 100644
index 00000000000..481182ee174
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ * An interface for extracting foreign keys from input records during foreign 
key joins in Kafka Streams.
+ * This extractor is used to determine the key of the foreign table to join 
with based on the primary
+ * table's record key and value.
+ * <p>
+ * The interface provides two factory methods:
+ * <ul>
+ *   <li>{@link #fromFunction(Function)} - when the foreign key depends only 
on the value</li>
+ *   <li>{@link #fromBiFunction(BiFunction)} - when the foreign key depends on 
both key and value</li>
+ * </ul>
+ *
+ * @param <K>  Type of primary table's key
+ * @param <V>  Type of primary table's value
+ * @param <KO> Type of the foreign key to extract
+ */
+@FunctionalInterface
+public interface ForeignKeyExtractor<K, V, KO> {
+    KO extract(K key, V value);
+
+    static <K, V, KO> ForeignKeyExtractor<K, V, KO> fromFunction(Function<V, 
KO> function) {
+        return (key, value) -> function.apply(value);
+    }
+
+    static <K, V, KO> ForeignKeyExtractor<K, V, KO> 
fromBiFunction(BiFunction<K, V, KO> biFunction) {
+        return biFunction::apply;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index eb864f3910b..10199c242b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
@@ -47,7 +46,7 @@ import static 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.Subscrip
 public class SubscriptionSendProcessorSupplier<K, KO, V> implements 
ProcessorSupplier<K, Change<V>, KO, SubscriptionWrapper<K>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(SubscriptionSendProcessorSupplier.class);
 
-    private final Function<V, KO> foreignKeyExtractor;
+    private final ForeignKeyExtractor<K, V, KO> foreignKeyExtractor;
     private final Supplier<String> foreignKeySerdeTopicSupplier;
     private final Supplier<String> valueSerdeTopicSupplier;
     private final boolean leftJoin;
@@ -55,7 +54,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> 
implements ProcessorSup
     private Serializer<V> valueSerializer;
     private boolean useVersionedSemantics;
 
-    public SubscriptionSendProcessorSupplier(final Function<V, KO> 
foreignKeyExtractor,
+    public SubscriptionSendProcessorSupplier(final ForeignKeyExtractor<K, V, 
KO> foreignKeyExtractor,
                                              final Supplier<String> 
foreignKeySerdeTopicSupplier,
                                              final Supplier<String> 
valueSerdeTopicSupplier,
                                              final Serde<KO> foreignKeySerde,
@@ -129,27 +128,27 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> 
implements ProcessorSup
 
         private void leftJoinInstructions(final Record<K, Change<V>> record) {
             if (record.value().oldValue != null) {
-                final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
-                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+                final KO oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
+                final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
                 if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
                     forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
                 }
                 forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             } else if (record.value().newValue != null) {
-                final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                final KO newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
                 forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             }
         }
 
         private void defaultJoinInstructions(final Record<K, Change<V>> 
record) {
             if (record.value().oldValue != null) {
-                final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.apply(record.value().oldValue);
+                final KO oldForeignKey = record.value().oldValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
                 if (oldForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
                     return;
                 }
                 if (record.value().newValue != null) {
-                    final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.apply(record.value().newValue);
+                    final KO newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
                     if (newForeignKey == null) {
                         logSkippedRecordDueToNullForeignKey();
                         return;
@@ -167,7 +166,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V> 
implements ProcessorSup
                     forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
                 }
             } else if (record.value().newValue != null) {
-                final KO newForeignKey = 
foreignKeyExtractor.apply(record.value().newValue);
+                final KO newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
                 if (newForeignKey == null) {
                     logSkippedRecordDueToNullForeignKey();
                 } else {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index 6e7073b5bf7..5606933733c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -233,6 +234,137 @@ public class KTableKTableForeignKeyJoinScenarioTest {
         )));
     }
 
+    @Test
+    public void shouldWorkWithCompositeKeyAndProducerIdInValue() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // Left table keyed by <producer_id, product_id>
+        final KTable<String, String> leftTable = builder.table(
+            "left_table",
+            Consumed.with(Serdes.String(), Serdes.String())
+        );
+
+        // Right table keyed by producer_id
+        final KTable<String, String> rightTable = builder.table(
+            "right_table",
+            Consumed.with(Serdes.String(), Serdes.String())
+        );
+
+        // Have to include producer_id in value since foreignKeyExtractor only 
gets value
+        final KTable<String, String> joined = leftTable.join(
+            rightTable,
+            value -> value.split("\\|")[0], // extract producer_id from value
+            (leftValue, rightValue) -> "(" + leftValue + "," + rightValue + 
")",
+            Materialized.as("store")
+        );
+
+        joined.toStream().to("output");
+
+        try (final TopologyTestDriver driver = 
createTopologyTestDriver(builder)) {
+            final TestInputTopic<String, String> leftInput = 
driver.createInputTopic(
+                "left_table",
+                new StringSerializer(),
+                new StringSerializer()
+            );
+            final TestInputTopic<String, String> rightInput = 
driver.createInputTopic(
+                "right_table",
+                new StringSerializer(),
+                new StringSerializer()
+            );
+            final TestOutputTopic<String, String> output = 
driver.createOutputTopic(
+                "output",
+                new StringDeserializer(),
+                new StringDeserializer()
+            );
+
+            // Key format: "producerId:productId"
+            // Left value format: "producerId|productData"
+            leftInput.pipeInput("producer1:product1", 
"producer1|product1-data");
+            leftInput.pipeInput("producer1:product2", 
"producer1|product2-data");
+            leftInput.pipeInput("producer2:product1", 
"producer2|product1-data");
+
+            rightInput.pipeInput("producer1", "producer1-data");
+            rightInput.pipeInput("producer2", "producer2-data");
+
+            final Map<String, String> expectedOutput = new HashMap<>();
+            expectedOutput.put("producer1:product1", 
"(producer1|product1-data,producer1-data)");
+            expectedOutput.put("producer1:product2", 
"(producer1|product2-data,producer1-data)");
+            expectedOutput.put("producer2:product1", 
"(producer2|product1-data,producer2-data)");
+
+            assertThat(output.readKeyValuesToMap(), is(expectedOutput));
+        }
+    }
+
+    @Test
+    public void shouldWorkWithCompositeKeyAndBiFunctionExtractor() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // Left table keyed by <producer_id, product_id>
+        final KTable<String, String> leftTable = builder.table(
+            "left_table",
+            Consumed.with(Serdes.String(), Serdes.String())
+        );
+
+        // Right table keyed by producer_id
+        final KTable<String, String> rightTable = builder.table(
+            "right_table",
+            Consumed.with(Serdes.String(), Serdes.String())
+        );
+
+        // Can extract producer_id from composite key using BiFunction
+        final KTable<String, String> joined = leftTable.join(
+            rightTable,
+            (key, value) -> key.split(":")[0], // extract producer_id from key
+            (leftValue, rightValue) -> "(" + leftValue + "," + rightValue + 
")",
+            Materialized.as("store")
+        );
+
+        joined.toStream().to("output");
+
+        try (final TopologyTestDriver driver = 
createTopologyTestDriver(builder)) {
+            final TestInputTopic<String, String> leftInput = 
driver.createInputTopic(
+                "left_table",
+                new StringSerializer(),
+                new StringSerializer()
+            );
+            final TestInputTopic<String, String> rightInput = 
driver.createInputTopic(
+                "right_table",
+                new StringSerializer(),
+                new StringSerializer()
+            );
+            final TestOutputTopic<String, String> output = 
driver.createOutputTopic(
+                "output",
+                new StringDeserializer(),
+                new StringDeserializer()
+            );
+
+            // Now we don't need producer_id in the value
+            leftInput.pipeInput("producer1:product1", "product1-data");
+            leftInput.pipeInput("producer1:product2", "product2-data");
+            leftInput.pipeInput("producer2:product1", "product1-data");
+
+            rightInput.pipeInput("producer1", "producer1-data");
+            rightInput.pipeInput("producer2", "producer2-data");
+
+            final Map<String, String> expectedOutput = new HashMap<>();
+            expectedOutput.put("producer1:product1", 
"(product1-data,producer1-data)");
+            expectedOutput.put("producer1:product2", 
"(product2-data,producer1-data)");
+            expectedOutput.put("producer2:product1", 
"(product1-data,producer2-data)");
+
+            assertThat(output.readKeyValuesToMap(), is(expectedOutput));
+        }
+    }
+
+    private TopologyTestDriver createTopologyTestDriver(final StreamsBuilder 
builder) {
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"dummy:1234");
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        return new TopologyTestDriver(builder.build(), config);
+    }
+
     private void validateTopologyCanProcessData(final StreamsBuilder builder) {
         final Properties config = new Properties();
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
index def6d9b36d3..43bcc51ce8c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java
@@ -45,7 +45,7 @@ public class SubscriptionSendProcessorSupplierTest {
 
     private final Processor<String, Change<LeftValue>, String, 
SubscriptionWrapper<String>> leftJoinProcessor =
         new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
-            LeftValue::getForeignKey,
+            ForeignKeyExtractor.fromFunction(LeftValue::getForeignKey),
             () -> "subscription-topic-fk",
             () -> "value-serde-topic",
             Serdes.String(),
@@ -55,7 +55,7 @@ public class SubscriptionSendProcessorSupplierTest {
 
     private final Processor<String, Change<LeftValue>, String, 
SubscriptionWrapper<String>> innerJoinProcessor =
         new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
-            LeftValue::getForeignKey,
+            ForeignKeyExtractor.fromFunction(LeftValue::getForeignKey),
             () -> "subscription-topic-fk",
             () -> "value-serde-topic",
             Serdes.String(),
@@ -327,6 +327,306 @@ public class SubscriptionSendProcessorSupplierTest {
         assertThat(context.forwarded(), empty());
     }
 
+    // Bi-function tests: inner join, left join
+    private final Processor<String, Change<LeftValue>, String, 
SubscriptionWrapper<String>> biFunctionLeftJoinProcessor =
+        new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
+            ForeignKeyExtractor.fromBiFunction((key, value) -> 
value.getForeignKey() == null ? null : key + value.getForeignKey()),
+            () -> "subscription-topic-fk",
+            () -> "value-serde-topic",
+            Serdes.String(),
+            new LeftValueSerializer(),
+            true
+        ).get();
+
+    private final Processor<String, Change<LeftValue>, String, 
SubscriptionWrapper<String>> biFunctionInnerJoinProcessor =
+        new SubscriptionSendProcessorSupplier<String, String, LeftValue>(
+            ForeignKeyExtractor.fromBiFunction((key, value) -> 
value.getForeignKey() == null ? null : key + value.getForeignKey()),
+            () -> "subscription-topic-fk",
+            () -> "value-serde-topic",
+            Serdes.String(),
+            new LeftValueSerializer(),
+            false
+        ).get();
+
+    // Bi-function tests: left join
+    @Test
+    public void biFunctionLeftJoinShouldPropagateNewPrimaryKeyWithNonNullFK() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk1);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, null), 0));
+
+        final String compositeKey = pk + fk1;
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void biFunctionLeftJoinShouldPropagateNewPrimaryKeyWithNullFK() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(null);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, null), 0));
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(null, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionLeftJoinShouldPropagateChangeOfFKFromNonNullToNonNullValue() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk2);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, new LeftValue(fk1)), 0));
+
+        final String compositeKey = pk + fk2;
+
+        assertThat(context.forwarded().size(), is(2));
+        assertThat(
+            context.forwarded().get(1).record(),
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void biFunctionLeftJoinShouldPropagateNewRecordOfUnchangedFK() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk1);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, leftRecordValue), 0));
+
+        final String compositeKey = pk + fk1;
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionLeftJoinShouldPropagateChangeOfFKFromNonNullToNullValue() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(null);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, new LeftValue(fk1)), 0));
+
+        final String compositeKey = pk + fk1;
+
+        assertThat(context.forwarded().size(), greaterThan(0));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 
0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionLeftJoinShouldPropagateChangeFromNullFKToNonNullFKValue() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk1);
+
+        final String compositeKey = pk + fk1;
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, new LeftValue(null)), 0));
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionLeftJoinShouldPropagateChangeFromNullFKToNullFKValue() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(null);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, leftRecordValue), 0));
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(null, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void biFunctionLeftJoinShouldPropagateDeletionOfAPrimaryKey() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(null, new LeftValue(fk1)), 0));
+
+        final String compositeKey = pk + fk1;
+
+        assertThat(context.forwarded().size(), greaterThan(0));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionLeftJoinShouldPropagateDeletionOfAPrimaryKeyThatHadNullFK() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(null, new LeftValue(null)), 0));
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(null, new SubscriptionWrapper<>(null, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionLeftJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionLeftJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        biFunctionLeftJoinProcessor.process(new Record<>(pk, new 
Change<>(null, null), 0));
+
+        assertThat(context.forwarded(), empty());
+    }
+
+    // Bi-function tests: inner join
+    @Test
+    public void biFunctionInnerJoinShouldPropagateNewPrimaryKey() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionInnerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk1);
+
+        biFunctionInnerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, null), 0));
+
+        final String compositeKey = pk + fk1;
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void biFunctionInnerJoinShouldNotPropagateNewPrimaryKeyWithNullFK() 
{
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionInnerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(null);
+
+        biFunctionInnerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, null), 0));
+
+        assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+        assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
+    }
+
+    @Test
+    public void biFunctionInnerJoinShouldDeleteOldAndPropagateNewFK() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionInnerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(fk2);
+
+        biFunctionInnerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, new LeftValue(fk1)), 0));
+
+        final String compositeKey1 = pk + fk1;
+        final String compositeKey2 = pk + fk2;
+
+        assertThat(context.forwarded().size(), is(2));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey1, new 
SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 
0))
+        );
+        assertThat(
+            context.forwarded().get(1).record(),
+            is(new Record<>(compositeKey2, new 
SubscriptionWrapper<>(hash(leftRecordValue), 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionInnerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionInnerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        final LeftValue leftRecordValue = new LeftValue(null);
+
+        biFunctionInnerJoinProcessor.process(new Record<>(pk, new 
Change<>(leftRecordValue, leftRecordValue), 0));
+
+        assertThat(context.forwarded(), empty());
+
+        // test dropped-records sensors
+        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+        assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
+    }
+
+    @Test
+    public void biFunctionInnerJoinShouldPropagateDeletionOfPrimaryKey() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionInnerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        biFunctionInnerJoinProcessor.process(new Record<>(pk, new 
Change<>(null, new LeftValue(fk1)), 0));
+
+        final String compositeKey = pk + fk1;
+
+        assertThat(context.forwarded().size(), is(1));
+        assertThat(
+            context.forwarded().get(0).record(),
+            is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, 
DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
+        );
+    }
+
+    @Test
+    public void 
biFunctionInnerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
+        final MockInternalNewProcessorContext<String, 
SubscriptionWrapper<String>> context = new MockInternalNewProcessorContext<>();
+        biFunctionInnerJoinProcessor.init(context);
+        context.setRecordMetadata("topic", 0, 0);
+
+        biFunctionInnerJoinProcessor.process(new Record<>(pk, new 
Change<>(null, null), 0));
+
+        assertThat(context.forwarded(), empty());
+    }
+
     private static class LeftValueSerializer implements Serializer<LeftValue> {
         @Override
         public byte[] serialize(final String topic, final LeftValue data) {
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 590e4b08fbd..6a7f42285a6 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.scala
 package kstream
 
+import scala.jdk.FunctionWrappers.AsJavaBiFunction
 import org.apache.kafka.common.utils.Bytes
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, TableJoined, 
ValueJoiner, ValueTransformerWithKeySupplier}
 import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
@@ -643,6 +644,26 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
   ): KTable[K, VR] =
     new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, 
materialized))
 
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using 
non-windowed inner join. Records from this
+   * table are joined according to the result of keyExtractor on the other 
KTable.
+   *
+   * @param other        the other [[KTable]] to be joined with this 
[[KTable]], keyed on the value obtained from keyExtractor
+   * @param keyExtractor a function that extracts the foreign key from this 
table's key and value
+   * @param joiner       a function that computes the join result for a pair 
of matching records
+   * @param materialized a `Materialized` that describes how the `StateStore` 
for the resulting [[KTable]]
+   *                     should be materialized.
+   * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
+   *         one for each matched record-pair with the same key
+   */
+  def join[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: (K, V) => KO,
+    joiner: ValueJoiner[V, VO, VR],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
+    new KTable(inner.join(other.inner, AsJavaBiFunction[K, V, 
KO](keyExtractor), joiner, materialized))
+
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using 
non-windowed inner join. Records from this
    * table are joined according to the result of keyExtractor on the other 
KTable.
@@ -666,6 +687,29 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
   ): KTable[K, VR] =
     new KTable(inner.join(other.inner, keyExtractor.asJavaFunction, joiner, 
tableJoined, materialized))
 
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using 
non-windowed inner join. Records from this
+   * table are joined according to the result of keyExtractor on the other 
KTable.
+   *
+   * @param other        the other [[KTable]] to be joined with this 
[[KTable]], keyed on the value obtained from keyExtractor
+   * @param keyExtractor a function that extracts the foreign key from this 
table's key and value
+   * @param joiner       a function that computes the join result for a pair 
of matching records
+   * @param tableJoined  a `org.apache.kafka.streams.kstream.TableJoined` used 
to configure
+   *                     partitioners and names of internal topics and stores
+   * @param materialized a `Materialized` that describes how the `StateStore` 
for the resulting [[KTable]]
+   *                     should be materialized.
+   * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
+   *         one for each matched record-pair with the same key
+   */
+  def join[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: (K, V) => KO,
+    joiner: ValueJoiner[V, VO, VR],
+    tableJoined: TableJoined[K, KO],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
+    new KTable(inner.join(other.inner, AsJavaBiFunction[K, V, 
KO](keyExtractor), joiner, tableJoined, materialized))
+
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using 
non-windowed left join. Records from this
    * table are joined according to the result of keyExtractor on the other 
KTable.
@@ -686,6 +730,26 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
   ): KTable[K, VR] =
     new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, 
joiner, materialized))
 
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using 
non-windowed left join. Records from this
+   * table are joined according to the result of keyExtractor on the other 
KTable.
+   *
+   * @param other        the other [[KTable]] to be joined with this 
[[KTable]], keyed on the value obtained from keyExtractor
+   * @param keyExtractor a function that extracts the foreign key from this 
table's key and value
+   * @param joiner       a function that computes the join result for a pair 
of matching records
+   * @param materialized a `Materialized` that describes how the `StateStore` 
for the resulting [[KTable]]
+   *                     should be materialized.
+   * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
+   *         one for each matched record-pair with the same key
+   */
+  def leftJoin[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: (K, V) => KO,
+    joiner: ValueJoiner[V, VO, VR],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
+    new KTable(inner.leftJoin(other.inner, AsJavaBiFunction[K, V, 
KO](keyExtractor), joiner, materialized))
+
   /**
    * Join records of this [[KTable]] with another [[KTable]]'s records using 
non-windowed left join. Records from this
    * table are joined according to the result of keyExtractor on the other 
KTable.
@@ -709,6 +773,29 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
   ): KTable[K, VR] =
     new KTable(inner.leftJoin(other.inner, keyExtractor.asJavaFunction, 
joiner, tableJoined, materialized))
 
+  /**
+   * Join records of this [[KTable]] with another [[KTable]]'s records using 
non-windowed left join. Records from this
+   * table are joined according to the result of keyExtractor on the other 
KTable.
+   *
+   * @param other        the other [[KTable]] to be joined with this 
[[KTable]], keyed on the value obtained from keyExtractor
+   * @param keyExtractor a function that extracts the foreign key from this 
table's key and value
+   * @param joiner       a function that computes the join result for a pair 
of matching records
+   * @param tableJoined  a `org.apache.kafka.streams.kstream.TableJoined` used 
to configure
+   *                     partitioners and names of internal topics and stores
+   * @param materialized a `Materialized` that describes how the `StateStore` 
for the resulting [[KTable]]
+   *                     should be materialized.
+   * @return a [[KTable]] that contains join-records for each key and values 
computed by the given joiner,
+   *         one for each matched record-pair with the same key
+   */
+  def leftJoin[VR, KO, VO](
+    other: KTable[KO, VO],
+    keyExtractor: (K, V) => KO,
+    joiner: ValueJoiner[V, VO, VR],
+    tableJoined: TableJoined[K, KO],
+    materialized: Materialized[K, VR, KeyValueStore[Bytes, Array[Byte]]]
+  ): KTable[K, VR] =
+    new KTable(inner.leftJoin(other.inner, AsJavaBiFunction[K, V, 
KO](keyExtractor), joiner, tableJoined, materialized))
+
   /**
    * Get the name of the local state store used that can be used to query this 
[[KTable]].
    *
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index 36031907339..e473c6579af 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -534,4 +534,84 @@ class KTableTest extends TestDriver {
 
     testDriver.close()
   }
+
+  @Test
+  def testJoinWithBiFunctionKeyExtractor(): Unit = {
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+
+    val table1 = builder.stream[String, String](sourceTopic1).toTable
+    val table2 = builder.stream[String, String](sourceTopic2).toTable
+
+    table1
+      .join[String, String, String](
+        table2,
+        (key: String, value: String) => s"$key-$value",
+        joiner = (v1: String, v2: String) => s"$v1+$v2",
+        materialized = Materialized.`with`[String, String, 
ByteArrayKeyValueStore]
+      )
+      .toStream
+      .to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+    val testInput1 = testDriver.createInput[String, String](sourceTopic1)
+    val testInput2 = testDriver.createInput[String, String](sourceTopic2)
+    val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+    testInput1.pipeInput("k1", "v1")
+    testInput2.pipeInput("k1-v1", "v2")
+
+    val record = testOutput.readKeyValue
+    assertEquals("k1", record.key)
+    assertEquals("v1+v2", record.value)
+
+    testDriver.close()
+  }
+
+  @Test
+  def testLeftJoinWithBiFunctionKeyExtractor(): Unit = {
+    val builder = new StreamsBuilder()
+    val sourceTopic1 = "source1"
+    val sourceTopic2 = "source2"
+    val sinkTopic = "sink"
+
+    val table1 = builder.stream[String, String](sourceTopic1).toTable
+    val table2 = builder.stream[String, String](sourceTopic2).toTable
+
+    table1
+      .leftJoin[String, String, String](
+        table2,
+        (key: String, value: String) => s"$key-$value",
+        joiner = (v1: String, v2: String) => 
s"${v1}+${Option(v2).getOrElse("null")}",
+        materialized = Materialized.`with`[String, String, 
ByteArrayKeyValueStore]
+      )
+      .toStream
+      .to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+    val testInput1 = testDriver.createInput[String, String](sourceTopic1)
+    val testInput2 = testDriver.createInput[String, String](sourceTopic2)
+    val testOutput = testDriver.createOutput[String, String](sinkTopic)
+
+    // First insert into the foreign key table (table2)
+    testInput2.pipeInput("k1-v1", "v2")
+
+    // Then insert into the primary table (table1)
+    testInput1.pipeInput("k1", "v1")
+
+    val record1 = testOutput.readKeyValue
+    assertEquals("k1", record1.key)
+    assertEquals("v1+v2", record1.value)
+
+    // Test with non-matching foreign key (should still output due to left 
join)
+    testInput1.pipeInput("k2", "v3")
+
+    val record2 = testOutput.readKeyValue
+    assertEquals("k2", record2.key)
+    assertEquals("v3+null", record2.value)
+
+    testDriver.close()
+  }
 }

Reply via email to