This is an automated email from the ASF dual-hosted git repository.
wcarlson pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a958c6cb12 Kafka-14748: Relax non-null FK left-join requirement
(#14107)
4a958c6cb12 is described below
commit 4a958c6cb12392746b100431f2ff8ab3c5f53b1a
Author: Florin Akermann <[email protected]>
AuthorDate: Wed Dec 6 01:04:32 2023 +0100
Kafka-14748: Relax non-null FK left-join requirement (#14107)
Relax non-null FK left-join requirement.
Testing Strategy: Inject extractor which returns null on first or second
element.
Reviewers: Walker Carlson <[email protected]>
---
docs/streams/developer-guide/dsl-api.html | 4 -
docs/streams/upgrade-guide.html | 31 +++---
.../org/apache/kafka/streams/kstream/KTable.java | 21 ++--
.../internals/foreignkeyjoin/CombinedKey.java | 1 -
.../SubscriptionJoinProcessorSupplier.java | 5 +-
.../SubscriptionReceiveProcessorSupplier.java | 56 ++++++----
.../SubscriptionSendProcessorSupplier.java | 122 ++++++++++-----------
.../KTableKTableForeignKeyJoinIntegrationTest.java | 103 ++++++++++++++++-
8 files changed, 224 insertions(+), 119 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index 88f7d6f4ae2..cc729ee25c2 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -2542,10 +2542,6 @@ Function<Long, Long> foreignKeyExtractor = (x)
-> x;
<blockquote>
<div>
<ul class="simple">
- <li>
- Records for which the <code
class="docutils literal"><span class="pre">foreignKeyExtractor</span></code>
produces <code class="docutils literal"><span class="pre">null</span></code>
are ignored and do not trigger a join.
- If you want to join with <code
class="docutils literal"><span class="pre">null</span></code> foreign keys, use
a suitable sentinel value to do so (i.e. <code class="docutils literal"><span
class="pre">"NULL"</span></code> for a String field, or <code class="docutils
literal"><span class="pre">-1</span></code> for an auto-incrementing integer
field).
- </li>
<li>Input records with a <code
class="docutils
literal"><span
class="pre">null</span></code>
value are interpreted as
<em>tombstones</em>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 1f122b6e366..4f905a98d1d 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -136,21 +136,7 @@
<h3><a id="streams_api_changes_370"
href="#streams_api_changes_370">Streams API changes in 3.7.0</a></h3>
<p>
IQv2 supports <code>RangeQuery</code> that allows to specify
unbounded, bounded, or half-open key-ranges, which return data in ascending
(byte[]-lexicographical) order (per partition).
- <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2">KIP-985</a>
extends this functionality by adding <code>.withDescendingKeys()<code> to
allow user to receive data in descending order.
- </p>
-
- <h3><a id="streams_api_changes_360"
href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
- <p>
- Rack aware task assignment was introduced in <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams">KIP-925</a>.
- Rack aware task assignment can be enabled for
<code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to
compute task assignments which can minimize cross rack traffic under certain
conditions.
- For more information, including how it can be enabled and further
configured, see the <a
href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka
Streams Developer Guide</b></a>.
- </p>
-
- <p>
- IQv2 supports a <code>RangeQuery</code> that allows to specify
unbounded, bounded, or half-open key-ranges. Users have to use
<code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
- or <code>withNoBounds()</code> to specify half-open or unbounded ranges,
but cannot use <code>withRange(K lower, K upper)</code> for the same.
- <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds">KIP-941</a>
closes this gap by allowing to pass in <code>null</code>
- as upper and lower bound (with semantics "no bound") to simplify the
usage of the <code>RangeQuery</code> class.
+ <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-985%3A+Add+reverseRange+and+reverseAll+query+over+kv-store+in+IQv2">KIP-985</a>
extends this functionality by adding <code>.withDescendingKeys()</code> to
allow user to receive data in descending order.
</p>
<p>
@@ -198,6 +184,21 @@
</code>
</pre>
</p>
+
+ <h3><a id="streams_api_changes_360"
href="#streams_api_changes_360">Streams API changes in 3.6.0</a></h3>
+ <p>
+ Rack aware task assignment was introduced in <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-925%3A+Rack+aware+task+assignment+in+Kafka+Streams">KIP-925</a>.
+ Rack aware task assignment can be enabled for
<code>StickyTaskAssignor</code> or <code>HighAvailabilityTaskAssignor</code> to
compute task assignments which can minimize cross rack traffic under certain
conditions.
+ For more information, including how it can be enabled and further
configured, see the <a
href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka
Streams Developer Guide</b></a>.
+ </p>
+
+ <p>
+ IQv2 supports a <code>RangeQuery</code> that allows to specify
unbounded, bounded, or half-open key-ranges. Users have to use
<code>withUpperBound(K)</code>, <code>withLowerBound(K)</code>,
+ or <code>withNoBounds()</code> to specify half-open or unbounded ranges,
but cannot use <code>withRange(K lower, K upper)</code> for the same.
+ <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds">KIP-941</a>
closes this gap by allowing to pass in <code>null</code>
+ as upper and lower bound (with semantics "no bound") to simplify the
usage of the <code>RangeQuery</code> class.
+ </p>
+
<h3><a id="streams_api_changes_350"
href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
<p>
A new state store type, versioned key-value stores, was introduced in
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index e530769be4f..0222da70340 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -2237,7 +2237,7 @@ public interface KTable<K, V> {
*
* @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
- * result is null, the update is ignored as
invalid.
+ * extract is null, then the right hand side of
the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
@@ -2254,8 +2254,8 @@ public interface KTable<K, V> {
* This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
- * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V) If the
- * result is null, the update is ignored as
invalid.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * extract is null, then the right hand side of
the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param named a {@link Named} config used to name the
processor in the topology
* @param <VR> the value type of the result {@code KTable}
@@ -2279,9 +2279,8 @@ public interface KTable<K, V> {
* <p>
* This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
*
- * @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
- * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V) If the
- * result is null, the update is ignored as
invalid.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * extract is null, then the right hand side of
the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure
partitioners and names of internal topics and stores
* @param <VR> the value type of the result {@code KTable}
@@ -2301,7 +2300,7 @@ public interface KTable<K, V> {
*
* @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
- * result is null, the update is ignored as
invalid.
+ * extract is null, then the right hand side of
the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param materialized a {@link Materialized} that describes how
the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code
null}
@@ -2321,8 +2320,8 @@ public interface KTable<K, V> {
* This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
- * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V) If the
- * result is null, the update is ignored as
invalid.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * extract is null, then the right hand side of
the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param named a {@link Named} config used to name the
processor in the topology
* @param materialized a {@link Materialized} that describes how
the {@link StateStore} for the resulting {@code KTable}
@@ -2350,8 +2349,8 @@ public interface KTable<K, V> {
* This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
- * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V) If the
- * result is null, the update is ignored as
invalid.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * extract is null, then the right hand side of
the result will be null.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param tableJoined a {@link TableJoined} used to configure
partitioners and names of internal topics and stores
* @param materialized a {@link Materialized} that describes how
the {@link StateStore} for the resulting {@code KTable}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
index ae5f20fe533..01e94da345a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java
@@ -24,7 +24,6 @@ public class CombinedKey<KF, KP> {
private final KP primaryKey;
CombinedKey(final KF foreignKey, final KP primaryKey) {
- Objects.requireNonNull(foreignKey, "foreignKey can't be null");
Objects.requireNonNull(primaryKey, "primaryKey can't be null");
this.foreignKey = foreignKey;
this.primaryKey = primaryKey;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index 7d31ef44223..a8677ce2958 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -77,7 +77,10 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
throw new UnsupportedVersionException("SubscriptionWrapper
is of an incompatible version.");
}
- final ValueAndTimestamp<VO> foreignValueAndTime =
foreignValues.get(record.key().getForeignKey());
+ final ValueAndTimestamp<VO> foreignValueAndTime =
+ record.key().getForeignKey() == null ?
+ null :
+ foreignValues.get(record.key().getForeignKey());
final long resultTimestamp =
foreignValueAndTime == null ?
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index cf88aec6f9a..90d70bdf330 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -75,20 +75,8 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
@Override
public void process(final Record<KO, SubscriptionWrapper<K>>
record) {
- if (record.key() == null) {
- if (context().recordMetadata().isPresent()) {
- final RecordMetadata recordMetadata =
context().recordMetadata().get();
- LOG.warn(
- "Skipping record due to null foreign key. "
- + "topic=[{}] partition=[{}] offset=[{}]",
- recordMetadata.topic(),
recordMetadata.partition(), recordMetadata.offset()
- );
- } else {
- LOG.warn(
- "Skipping record due to null foreign key. Topic,
partition, and offset not known."
- );
- }
- droppedRecordsSensor.record();
+ if (record.key() == null &&
!SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().getInstruction()))
{
+ dropRecord();
return;
}
if (record.value().getVersion() >
SubscriptionWrapper.CURRENT_VERSION) {
@@ -97,7 +85,22 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
//from older SubscriptionWrapper versions to newer
versions.
throw new UnsupportedVersionException("SubscriptionWrapper
is of an incompatible version.");
}
+ context().forward(
+ record.withKey(new CombinedKey<>(record.key(),
record.value().getPrimaryKey()))
+ .withValue(inferChange(record))
+ .withTimestamp(record.timestamp())
+ );
+ }
+ private Change<ValueAndTimestamp<SubscriptionWrapper<K>>>
inferChange(final Record<KO, SubscriptionWrapper<K>> record) {
+ if (record.key() == null) {
+ return new Change<>(ValueAndTimestamp.make(record.value(),
record.timestamp()), null);
+ } else {
+ return inferBasedOnState(record);
+ }
+ }
+
+ private Change<ValueAndTimestamp<SubscriptionWrapper<K>>>
inferBasedOnState(final Record<KO, SubscriptionWrapper<K>> record) {
final Bytes subscriptionKey = keySchema.toBytes(record.key(),
record.value().getPrimaryKey());
final ValueAndTimestamp<SubscriptionWrapper<K>> newValue =
ValueAndTimestamp.make(record.value(), record.timestamp());
@@ -110,14 +113,23 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
} else {
store.put(subscriptionKey, newValue);
}
- final Change<ValueAndTimestamp<SubscriptionWrapper<K>>> change
= new Change<>(newValue, oldValue);
- // note: key is non-nullable
- // note: newValue is non-nullable
- context().forward(
- record.withKey(new CombinedKey<>(record.key(),
record.value().getPrimaryKey()))
- .withValue(change)
- .withTimestamp(newValue.timestamp())
- );
+ return new Change<>(newValue, oldValue);
+ }
+
+ private void dropRecord() {
+ if (context().recordMetadata().isPresent()) {
+ final RecordMetadata recordMetadata =
context().recordMetadata().get();
+ LOG.warn(
+ "Skipping record due to null foreign key. "
+ + "topic=[{}] partition=[{}] offset=[{}]",
+ recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset()
+ );
+ } else {
+ LOG.warn(
+ "Skipping record due to null foreign key. Topic,
partition, and offset not known."
+ );
+ }
+ droppedRecordsSensor.record();
}
};
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
index 8a6298c28de..09bd35339ab 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
import java.util.function.Function;
import java.util.function.Supplier;
+import static
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction;
import static
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE;
import static
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE;
import static
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
@@ -86,6 +87,7 @@ public class SubscriptionSendProcessorSupplier<K, KO, V>
implements ProcessorSup
private Sensor droppedRecordsSensor;
private String foreignKeySerdeTopic;
private String valueSerdeTopic;
+ private long[] recordHash;
@SuppressWarnings("unchecked")
@Override
@@ -109,108 +111,102 @@ public class SubscriptionSendProcessorSupplier<K, KO,
V> implements ProcessorSup
@Override
public void process(final Record<K, Change<V>> record) {
+ // clear cashed hash from previous record
+ recordHash = null;
// drop out-of-order records from versioned tables (cf. KIP-914)
if (useVersionedSemantics && !record.value().isLatest) {
LOG.info("Skipping out-of-order record from versioned table
while performing table-table join.");
droppedRecordsSensor.record();
return;
}
+ if (leftJoin) {
+ leftJoinInstructions(record);
+ } else {
+ defaultJoinInstructions(record);
+ }
+ }
- final long[] currentHash = record.value().newValue == null ?
- null :
- Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic,
record.value().newValue));
-
- final int partition = context().recordMetadata().get().partition();
+ private void leftJoinInstructions(final Record<K, Change<V>> record) {
if (record.value().oldValue != null) {
final KO oldForeignKey =
foreignKeyExtractor.apply(record.value().oldValue);
+ final KO newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.apply(record.value().newValue);
+ if (oldForeignKey != null &&
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+ forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
+ }
+ forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+ } else if (record.value().newValue != null) {
+ final KO newForeignKey =
foreignKeyExtractor.apply(record.value().newValue);
+ forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
+ }
+ }
+
+ private void defaultJoinInstructions(final Record<K, Change<V>>
record) {
+ if (record.value().oldValue != null) {
+ final KO oldForeignKey = record.value().oldValue == null ?
null : foreignKeyExtractor.apply(record.value().oldValue);
if (oldForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
return;
}
if (record.value().newValue != null) {
- final KO newForeignKey =
foreignKeyExtractor.apply(record.value().newValue);
+ final KO newForeignKey = record.value().newValue == null ?
null : foreignKeyExtractor.apply(record.value().newValue);
if (newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
return;
}
-
- final byte[] serialOldForeignKey =
- foreignKeySerializer.serialize(foreignKeySerdeTopic,
oldForeignKey);
- final byte[] serialNewForeignKey =
- foreignKeySerializer.serialize(foreignKeySerdeTopic,
newForeignKey);
- if (!Arrays.equals(serialNewForeignKey,
serialOldForeignKey)) {
+ if (!Arrays.equals(serialize(newForeignKey),
serialize(oldForeignKey))) {
//Different Foreign Key - delete the old key value and
propagate the new one.
//Delete it from the oldKey's state store
- context().forward(
- record.withKey(oldForeignKey)
- .withValue(new SubscriptionWrapper<>(
- currentHash,
- DELETE_KEY_NO_PROPAGATE,
- record.key(),
- partition
- )));
- //Add to the newKey's state store. Additionally,
propagate null if no FK is found there,
- //since we must "unset" any output set by the previous
FK-join. This is true for both INNER
- //and LEFT join.
+ forward(record, oldForeignKey,
DELETE_KEY_NO_PROPAGATE);
}
- context().forward(
- record.withKey(newForeignKey)
- .withValue(new SubscriptionWrapper<>(
- currentHash,
- PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE,
- record.key(),
- partition
- )));
+ //Add to the newKey's state store. Additionally, propagate
null if no FK is found there,
+ //since we must "unset" any output set by the previous
FK-join. This is true for both INNER
+ //and LEFT join.
+ forward(record, newForeignKey,
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else {
- //A simple propagatable delete. Delete from the state
store and propagate the delete onwards.
- context().forward(
- record.withKey(oldForeignKey)
- .withValue(new SubscriptionWrapper<>(
- currentHash,
- DELETE_KEY_AND_PROPAGATE,
- record.key(),
- partition
- )));
+ forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
} else if (record.value().newValue != null) {
- //change.oldValue is null, which means it was deleted at least
once before, or it is brand new.
- //In either case, we only need to propagate if the FK_VAL is
available, as the null from the delete would
- //have been propagated otherwise.
-
- final SubscriptionWrapper.Instruction instruction;
- if (leftJoin) {
- //Want to send info even if RHS is null.
- instruction = PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE;
- } else {
- instruction = PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE;
- }
final KO newForeignKey =
foreignKeyExtractor.apply(record.value().newValue);
if (newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
} else {
- context().forward(
- record.withKey(newForeignKey)
- .withValue(new SubscriptionWrapper<>(
- currentHash,
- instruction,
- record.key(),
- partition)));
+ forward(record, newForeignKey,
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
}
}
+ private byte[] serialize(final KO key) {
+ return foreignKeySerializer.serialize(foreignKeySerdeTopic, key);
+ }
+
+ private void forward(final Record<K, Change<V>> record, final KO
foreignKey, final Instruction deleteKeyNoPropagate) {
+ final SubscriptionWrapper<K> wrapper = new SubscriptionWrapper<>(
+ hash(record),
+ deleteKeyNoPropagate,
+ record.key(),
+ context().recordMetadata().get().partition()
+ );
+ context().forward(record.withKey(foreignKey).withValue(wrapper));
+ }
+
+ private long[] hash(final Record<K, Change<V>> record) {
+ if (recordHash == null) {
+ recordHash = record.value().newValue == null
+ ? null
+ :
Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic,
record.value().newValue));
+ }
+ return recordHash;
+ }
+
private void logSkippedRecordDueToNullForeignKey() {
if (context().recordMetadata().isPresent()) {
final RecordMetadata recordMetadata =
context().recordMetadata().get();
LOG.warn(
- "Skipping record due to null foreign key. "
- + "topic=[{}] partition=[{}] offset=[{}]",
+ "Skipping record due to null foreign key. topic=[{}]
partition=[{}] offset=[{}]",
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset()
);
} else {
- LOG.warn(
- "Skipping record due to null foreign key. Topic,
partition, and offset not known."
- );
+ LOG.warn("Skipping record due to null foreign key. Topic,
partition, and offset not known.");
}
droppedRecordsSensor.record();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index b76b5ddc0db..2bb7b6ea190 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
-import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -34,6 +33,7 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
@@ -41,11 +41,14 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Assertions;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -646,6 +649,85 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
}
+ @Test
+ public void shouldEmitRecordOnNullForeignKeyForLeftJoins() {
+ final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, true, rejoin, leftVersioned, rightVersioned, value -> null);
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
+ {
+ final Map<String, String> expected = mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+ );
+ assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+ if (materialized) {
+ assertThat(asMap(store), is(expected));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void shouldEmitRecordWhenOldAndNewFkDiffer() {
+ final Function<String, String> foreignKeyExtractor = value -> {
+ final String split = value.split("\\|")[1];
+ if (split.equals("returnNull")) {
+ //new fk
+ return null;
+ } else {
+ //old fk
+ return split;
+ }
+ };
+ final Topology topology = getTopology(streamsConfig, materialized ?
"store" : null, true, rejoin, leftVersioned, rightVersioned,
foreignKeyExtractor);
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
+ final String subscriptionStoreName =
driver.getAllStateStores().entrySet().stream()
+ .filter(e -> e.getKey().contains("SUBSCRIPTION-STATE-STORE"))
+ .findAny().orElseThrow(() -> new RuntimeException("couldn't
find store")).getKey();
+ final KeyValueStore<Bytes, ValueAndTimestamp<String>>
subscriptionStore = driver.getKeyValueStore(subscriptionStoreName);
+ final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
+ left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
+ {
+ final Map<String, String> expected = mkMap(
+ mkEntry("lhs1", "(lhsValue1|rhs1,null)")
+ );
+ assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+ if (materialized) {
+ assertThat(asMap(store), is(expected));
+ }
+ Assertions.assertNotNull(subscriptionStore.get(key));
+ }
+ left.pipeInput("lhs1", "lhsValue1|returnNull", baseTimestamp);
+ {
+ final Map<String, String> expected = mkMap(
+ mkEntry("lhs1", "(lhsValue1|returnNull,null)")
+ );
+ assertThat(outputTopic.readKeyValuesToMap(), is(expected));
+ if (materialized) {
+ assertThat(asMap(store), is(expected));
+ }
+ Assertions.assertNull(subscriptionStore.get(key));
+ }
+ }
+ }
+
+ private static Bytes subscriptionStoreKey(final String lhs, final String
rhs) {
+ final byte[] lhs1bytes = lhs.getBytes();
+ final byte[] rhs1bytes = rhs.getBytes();
+ final ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES +
lhs1bytes.length + rhs1bytes.length);
+ buf.putInt(rhs1bytes.length);
+ buf.put(rhs1bytes);
+ buf.put(lhs1bytes);
+ final Bytes key = Bytes.wrap(buf.array());
+ return key;
+ }
+
protected static Map<String, String> asMap(final KeyValueStore<String,
String> store) {
final HashMap<String, String> result = new HashMap<>();
store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
@@ -658,6 +740,24 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final boolean rejoin,
final boolean leftVersioned,
final boolean rightVersioned) {
+ return getTopology(
+ streamsConfig,
+ queryableStoreName,
+ leftJoin,
+ rejoin,
+ leftVersioned,
+ rightVersioned,
+ value -> value.split("\\|")[1]
+ );
+ }
+
+ protected static Topology getTopology(final Properties streamsConfig,
+ final String queryableStoreName,
+ final boolean leftJoin,
+ final boolean rejoin,
+ final boolean leftVersioned,
+ final boolean rightVersioned,
+ final Function<String, String>
extractor) {
final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope();
final StreamsBuilder builder = new StreamsBuilder();
@@ -693,7 +793,6 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
);
}
- final Function<String, String> extractor = value ->
value.split("\\|")[1];
final ValueJoiner<String, String, String> joiner = (value1, value2) ->
"(" + value1 + "," + value2 + ")";
final ValueJoiner<String, String, String> rejoiner = rejoin ? (value1,
value2) -> "rejoin(" + value1 + "," + value2 + ")" : null;