This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 0956966 KAFKA-9058: Lift queriable and materialized restrictions on
FK Join (#7541)
0956966 is described below
commit 09569668ae2ae3e86f2e6de7819f9496fc12a1f9
Author: John Roesler <[email protected]>
AuthorDate: Thu Oct 17 13:42:10 2019 -0500
KAFKA-9058: Lift queriable and materialized restrictions on FK Join (#7541)
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../org/apache/kafka/streams/kstream/KTable.java | 90 +++++++++-
.../streams/kstream/internals/KTableImpl.java | 123 ++++++++++---
.../streams/integration/ForeignKeyJoinSuite.java | 1 +
.../KTableKTableForeignKeyJoinIntegrationTest.java | 58 +++----
...reignKeyJoinMaterializationIntegrationTest.java | 192 +++++++++++++++++++++
5 files changed, 403 insertions(+), 61 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 5092d35..f9b38bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -2128,18 +2128,54 @@ public interface KTable<K, V> {
* @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
* result is null, the update is ignored as
invalid.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
+ * @param <VR> the value type of the result {@code KTable}
+ * @param <KO> the key type of the other {@code KTable}
+ * @param <VO> the value type of the other {@code KTable}
+ * @return a {@code KTable} that contains the result of joining this table
with {@code other}
+ */
+ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+ final Function<V, KO> foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR> joiner);
+
+ /**
+ * Join records of this {@code KTable} with another {@code KTable} using
non-windowed inner join.
+ * <p>
+ * This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
+ *
+ * @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * result is null, the update is ignored as
invalid.
+ * @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param named a {@link Named} config used to name the
processor in the topology
+ * @param <VR> the value type of the result {@code KTable}
+ * @param <KO> the key type of the other {@code KTable}
+ * @param <VO> the value type of the other {@code KTable}
+ * @return a {@code KTable} that contains the result of joining this table
with {@code other}
+ */
+ <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+ final Function<V, KO> foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR> joiner,
+ final Named named);
+
+ /**
+ * Join records of this {@code KTable} with another {@code KTable} using
non-windowed inner join.
+ * <p>
+ * This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
+ *
+ * @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * result is null, the update is ignored as
invalid.
+ * @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param materialized a {@link Materialized} that describes how
the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code
null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
- * @return
+ * @return a {@code KTable} that contains the result of joining this table
with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
- final Named named,
final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized);
/**
@@ -2151,16 +2187,18 @@ public interface KTable<K, V> {
* @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
* result is null, the update is ignored as
invalid.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
+ * @param named a {@link Named} config used to name the
processor in the topology
* @param materialized a {@link Materialized} that describes how
the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code
null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
- * @return
+ * @return a {@code KTable} that contains the result of joining this table
with {@code other}
*/
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
+ final Named named,
final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized);
/**
@@ -2169,21 +2207,57 @@ public interface KTable<K, V> {
* This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * result is null, the update is ignored as
invalid.
+ * @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
+ * @param <VR> the value type of the result {@code KTable}
+ * @param <KO> the key type of the other {@code KTable}
+ * @param <VO> the value type of the other {@code KTable}
+ * @return a {@code KTable} that contains only those records that satisfy
the given predicate
+ */
+ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+ final Function<V, KO>
foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR> joiner);
+
+ /**
+ * Join records of this {@code KTable} with another {@code KTable} using
non-windowed left join.
+ * <p>
+ * This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
+ *
+ * @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
* @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V) If the
* result is null, the update is ignored as
invalid.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param named a {@link Named} config used to name the
processor in the topology
+ * @param <VR> the value type of the result {@code KTable}
+ * @param <KO> the key type of the other {@code KTable}
+ * @param <VO> the value type of the other {@code KTable}
+ * @return a {@code KTable} that contains the result of joining this table
with {@code other}
+ */
+ <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+ final Function<V, KO>
foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR> joiner,
+ final Named named);
+
+ /**
+ * Join records of this {@code KTable} with another {@code KTable} using
non-windowed left join.
+ * <p>
+ * This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
+ *
+ * @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * result is null, the update is ignored as
invalid.
+ * @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
* @param materialized a {@link Materialized} that describes how
the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code
null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
- * @return a {@code KTable} that contains only those records that satisfy
the given predicate
+ * @return a {@code KTable} that contains the result of joining this table
with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO>
foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
- final Named named,
final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized);
/**
@@ -2192,19 +2266,21 @@ public interface KTable<K, V> {
* This is a foreign key join, where the joining key is determined by the
{@code foreignKeyExtractor}.
*
* @param other the other {@code KTable} to be joined with
this {@code KTable}. Keyed by KO.
- * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V). If the
+ * @param foreignKeyExtractor a {@link Function} that extracts the key
(KO) from this table's value (V) If the
* result is null, the update is ignored as
invalid.
* @param joiner a {@link ValueJoiner} that computes the join
result for a pair of matching records
+ * @param named a {@link Named} config used to name the
processor in the topology
* @param materialized a {@link Materialized} that describes how
the {@link StateStore} for the resulting {@code KTable}
* should be materialized. Cannot be {@code
null}
* @param <VR> the value type of the result {@code KTable}
* @param <KO> the key type of the other {@code KTable}
* @param <VO> the value type of the other {@code KTable}
- * @return a {@code KTable} that contains only those records that satisfy
the given predicate
+ * @return a {@code KTable} that contains the result of joining this table
with {@code other}
*/
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO>
foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
+ final Named named,
final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized);
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 05e04e8..301710d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -110,12 +110,14 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
private static final String TRANSFORMVALUES_NAME =
"KTABLE-TRANSFORMVALUES-";
- private static final String FK_JOIN_STATE_STORE_NAME =
"KTABLE-INTERNAL-SUBSCRIPTION-STATE-STORE-";
- private static final String SUBSCRIPTION_REGISTRATION =
"KTABLE-SUBSCRIPTION-REGISTRATION-";
- private static final String SUBSCRIPTION_RESPONSE =
"KTABLE-SUBSCRIPTION-RESPONSE-";
- private static final String SUBSCRIPTION_PROCESSOR =
"KTABLE-SUBSCRIPTION-PROCESSOR-";
- private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR =
"KTABLE-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
- private static final String FK_JOIN_OUTPUT_PROCESSOR =
"KTABLE-OUTPUT-PROCESSOR-";
+ private static final String FK_JOIN = "KTABLE-FK-JOIN-";
+ private static final String FK_JOIN_STATE_STORE_NAME = FK_JOIN +
"SUBSCRIPTION-STATE-STORE-";
+ private static final String SUBSCRIPTION_REGISTRATION = FK_JOIN +
"SUBSCRIPTION-REGISTRATION-";
+ private static final String SUBSCRIPTION_RESPONSE = FK_JOIN +
"SUBSCRIPTION-RESPONSE-";
+ private static final String SUBSCRIPTION_PROCESSOR = FK_JOIN +
"SUBSCRIPTION-PROCESSOR-";
+ private static final String SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR =
FK_JOIN + "SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-";
+ private static final String FK_JOIN_OUTPUT_NAME = FK_JOIN + "OUTPUT-";
+
private static final String TOPIC_SUFFIX = "-topic";
private static final String SINK_NAME = "KTABLE-SINK-";
@@ -836,20 +838,76 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO>
foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR>
joiner) {
+ return doJoinOnForeignKey(
+ other,
+ foreignKeyExtractor,
+ joiner,
+ NamedInternal.empty(),
+ Materialized.with(null, null),
+ false
+ );
+ }
+
+ @Override
+ public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+ final Function<V, KO>
foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
- final Named named,
- final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized) {
+ final Named named) {
+ return doJoinOnForeignKey(
+ other,
+ foreignKeyExtractor,
+ joiner,
+ named,
+ Materialized.with(null, null),
+ false
+ );
+ }
- return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named,
new MaterializedInternal<>(materialized), false);
+ @Override
+ public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
+ final Function<V, KO>
foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR> joiner,
+ final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized) {
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner,
NamedInternal.empty(), materialized, false);
}
@Override
public <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO>
foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
+ final Named named,
final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized) {
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named,
materialized, false);
+ }
+
+ @Override
+ public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+ final Function<V, KO>
foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR>
joiner) {
+ return doJoinOnForeignKey(
+ other,
+ foreignKeyExtractor,
+ joiner,
+ NamedInternal.empty(),
+ Materialized.with(null, null),
+ true
+ );
+ }
- return doJoinOnForeignKey(other, foreignKeyExtractor, joiner,
NamedInternal.empty(), new MaterializedInternal<>(materialized), false);
+ @Override
+ public <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
+ final Function<V, KO>
foreignKeyExtractor,
+ final ValueJoiner<V, VO, VR>
joiner,
+ final Named named) {
+ return doJoinOnForeignKey(
+ other,
+ foreignKeyExtractor,
+ joiner,
+ named,
+ Materialized.with(null, null),
+ true
+ );
}
@Override
@@ -858,7 +916,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final ValueJoiner<V, VO, VR>
joiner,
final Named named,
final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized) {
- return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named,
new MaterializedInternal<>(materialized), true);
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, named,
materialized, true);
}
@Override
@@ -866,23 +924,21 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
final Function<V, KO>
foreignKeyExtractor,
final ValueJoiner<V, VO, VR>
joiner,
final Materialized<K, VR,
KeyValueStore<Bytes, byte[]>> materialized) {
-
- return doJoinOnForeignKey(other, foreignKeyExtractor, joiner,
NamedInternal.empty(), new MaterializedInternal<>(materialized), true);
+ return doJoinOnForeignKey(other, foreignKeyExtractor, joiner,
NamedInternal.empty(), materialized, true);
}
-
@SuppressWarnings("unchecked")
private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO>
foreignKeyTable,
final Function<V,
KO> foreignKeyExtractor,
final ValueJoiner<V,
VO, VR> joiner,
final Named joinName,
- final
MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
+ final
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final boolean
leftJoin) {
Objects.requireNonNull(foreignKeyTable, "foreignKeyTable can't be
null");
Objects.requireNonNull(foreignKeyExtractor, "foreignKeyExtractor can't
be null");
Objects.requireNonNull(joiner, "joiner can't be null");
Objects.requireNonNull(joinName, "joinName can't be null");
- Objects.requireNonNull(materializedInternal, "materialized can't be
null");
+ Objects.requireNonNull(materialized, "materialized can't be null");
//Old values are a useful optimization. The old values from the
foreignKeyTable table are compared to the new values,
//such that identical values do not cause a prefixScan. PrefixScan and
propagation can be expensive and should
@@ -1011,14 +1067,15 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
builder.internalTopologyBuilder.copartitionSources(resultSourceNodes);
final KTableValueGetterSupplier<K, V> primaryKeyValueGetter =
valueGetterSupplier();
+ final SubscriptionResolverJoinProcessorSupplier<K, V, VO, VR>
resolverProcessorSupplier = new SubscriptionResolverJoinProcessorSupplier<>(
+ primaryKeyValueGetter,
+ valueSerde().serializer(),
+ joiner,
+ leftJoin
+ );
final StatefulProcessorNode<K, SubscriptionResponseWrapper<VO>>
resolverNode = new StatefulProcessorNode<>(
new ProcessorParameters<>(
- new SubscriptionResolverJoinProcessorSupplier<>(
- primaryKeyValueGetter,
- valueSerde().serializer(),
- joiner,
- leftJoin
- ),
+ resolverProcessorSupplier,
renamed.suffixWithOrElseGet("-subscription-response-resolver",
builder, SUBSCRIPTION_RESPONSE_RESOLVER_PROCESSOR)
),
Collections.emptySet(),
@@ -1026,8 +1083,26 @@ public class KTableImpl<K, S, V> extends
AbstractStream<K, V> implements KTable<
);
builder.addGraphNode(foreignResponseSource, resolverNode);
- final String resultProcessorName =
renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_PROCESSOR);
- final KTableSource<K, VR> resultProcessorSupplier = new
KTableSource<>(materializedInternal.storeName(),
materializedInternal.queryableStoreName());
+ final String resultProcessorName =
renamed.suffixWithOrElseGet("-result", builder, FK_JOIN_OUTPUT_NAME);
+
+ final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>>
materializedInternal =
+ new MaterializedInternal<>(
+ materialized,
+ builder,
+ FK_JOIN_OUTPUT_NAME
+ );
+
+ // If we have a key serde, it's still valid, but we don't know the
value serde, since it's the result
+ // of the joiner (VR).
+ if (materializedInternal.keySerde() == null) {
+ materializedInternal.withKeySerde(keySerde);
+ }
+
+ final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
+ materializedInternal.storeName(),
+ materializedInternal.queryableStoreName()
+ );
+
final StoreBuilder<TimestampedKeyValueStore<K, VR>> resultStore =
materializedInternal.queryableStoreName() == null
? null
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
index ac6adb8..47e2d95 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ForeignKeyJoinSuite.java
@@ -38,6 +38,7 @@ import org.junit.runners.Suite;
BytesTest.class,
KTableKTableForeignKeyInnerJoinMultiIntegrationTest.class,
KTableKTableForeignKeyJoinIntegrationTest.class,
+ KTableKTableForeignKeyJoinMaterializationIntegrationTest.class,
CombinedKeySchemaTest.class,
SubscriptionWrapperSerdeTest.class,
SubscriptionResponseWrapperSerdeTest.class,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index 14a39b5..80c0f52 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
@@ -30,7 +29,6 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -54,7 +52,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-@RunWith(value = Parameterized.class)
+@RunWith(Parameterized.class)
public class KTableKTableForeignKeyJoinIntegrationTest {
private static final String LEFT_TABLE = "left_table";
@@ -265,10 +263,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
}
@Test
- public void shouldEmitTombstonedWhenDeletingNonJoiningRecords() {
+ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
final Topology topology = getTopology(streamsConfig, "store",
leftJoin);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
- final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
@@ -295,7 +292,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
{
assertThat(
outputTopic.readKeyValuesToMap(),
- is(Utils.<String, String>mkMap(mkEntry("lhs1", null)))
+ is(mkMap(mkEntry("lhs1", null)))
);
assertThat(
asMap(store),
@@ -322,7 +319,6 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords() {
final Topology topology = getTopology(streamsConfig, "store",
leftJoin);
try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
- final TestInputTopic<String, String> right =
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new
StringSerializer());
final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
@@ -369,9 +365,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs1", "lhsValue1|rhs2");
assertThat(
outputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null)
- ))
+ is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" :
null)))
);
assertThat(
asMap(store),
@@ -381,9 +375,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs1", "lhsValue1|rhs3");
assertThat(
outputTopic.readKeyValuesToMap(),
- is(mkMap(
- mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null)
- ))
+ is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" :
null)))
);
assertThat(
asMap(store),
@@ -448,29 +440,35 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
final KTable<String, String> left = builder.table(LEFT_TABLE,
Consumed.with(Serdes.String(), Serdes.String()));
final KTable<String, String> right = builder.table(RIGHT_TABLE,
Consumed.with(Serdes.String(), Serdes.String()));
+ final Function<String, String> extractor = value ->
value.split("\\|")[1];
+ final ValueJoiner<String, String, String> joiner = (value1, value2) ->
"(" + value1 + "," + value2 + ")";
+
final Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized =
Materialized.<String,
String>as(Stores.inMemoryKeyValueStore(queryableStoreName))
- .withKeySerde(Serdes.String())
.withValueSerde(Serdes.String())
+ // the cache suppresses some of the unnecessary tombstones we
want to make assertions about
.withCachingDisabled();
- final Function<String, String> extractor = value ->
value.split("\\|")[1];
- final ValueJoiner<String, String, String> joiner = (value1, value2) ->
"(" + value1 + "," + value2 + ")";
+ final KTable<String, String> joinResult;
+ if (leftJoin) {
+ joinResult = left.leftJoin(
+ right,
+ extractor,
+ joiner,
+ materialized
+ );
+ } else {
+ joinResult = left.join(
+ right,
+ extractor,
+ joiner,
+ materialized
+ );
+ }
- if (leftJoin)
- left.leftJoin(right,
- extractor,
- joiner,
- materialized)
- .toStream()
- .to(OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
- else
- left.join(right,
- extractor,
- joiner,
- materialized)
- .toStream()
- .to(OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
+ joinResult
+ .toStream()
+ .to(OUTPUT);
return builder.build(streamsConfig);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
new file mode 100644
index 0000000..6abedb2
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+
+@RunWith(Parameterized.class)
+public class KTableKTableForeignKeyJoinMaterializationIntegrationTest {
+
+ private static final String LEFT_TABLE = "left_table";
+ private static final String RIGHT_TABLE = "right_table";
+ private static final String OUTPUT = "output-topic";
+ private final Properties streamsConfig;
+ private final boolean materialized;
+ private final boolean queriable;
+
+ public KTableKTableForeignKeyJoinMaterializationIntegrationTest(final
boolean materialized, final boolean queriable) {
+ this.materialized = materialized;
+ this.queriable = queriable;
+ streamsConfig = mkProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG,
"ktable-ktable-joinOnForeignKey"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "asdf:0000"),
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath())
+ ));
+ }
+
+ @Parameterized.Parameters(name = "materialized={0}, queriable={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[] {false, false},
+ new Object[] {true, false},
+ new Object[] {true, true}
+ );
+ }
+
+ @Test
+ public void shouldEmitTombstoneWhenDeletingNonJoiningRecords() {
+ final Topology topology = getTopology(streamsConfig, "store");
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(topology, streamsConfig)) {
+ final TestInputTopic<String, String> left =
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new
StringSerializer());
+ final TestOutputTopic<String, String> outputTopic =
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new
StringDeserializer());
+ final KeyValueStore<String, String> store =
driver.getKeyValueStore("store");
+
+ left.pipeInput("lhs1", "lhsValue1|rhs1");
+
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+ if (materialized && queriable) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
+
+ // Deleting a non-joining record produces an unnecessary tombstone
for inner joins, because
+ // it's not possible to know whether a result was previously
emitted.
+ left.pipeInput("lhs1", (String) null);
+ {
+ if (materialized && queriable) {
+ // in only this specific case, the record cache will
actually be activated and
+ // suppress the unnecessary tombstone. This is because the
cache is able to determine
+ // for sure that there has never been a previous result.
(Because the "old" and "new" values
+ // are both null, and the underlying store is also missing
the record in question).
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ } else {
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(mkMap(mkEntry("lhs1", null)))
+ );
+ }
+ }
+
+ // Deleting a non-existing record is idempotent
+ left.pipeInput("lhs1", (String) null);
+ {
+ assertThat(
+ outputTopic.readKeyValuesToMap(),
+ is(emptyMap())
+ );
+ if (materialized && queriable) {
+ assertThat(
+ asMap(store),
+ is(emptyMap())
+ );
+ }
+ }
+ }
+ }
+
+ private static Map<String, String> asMap(final KeyValueStore<String,
String> store) {
+ final HashMap<String, String> result = new HashMap<>();
+ store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
+ return result;
+ }
+
+ private Topology getTopology(final Properties streamsConfig,
+ final String queryableStoreName) {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final KTable<String, String> left = builder.table(LEFT_TABLE,
Consumed.with(Serdes.String(), Serdes.String()));
+ final KTable<String, String> right = builder.table(RIGHT_TABLE,
Consumed.with(Serdes.String(), Serdes.String()));
+
+ final Function<String, String> extractor = value ->
value.split("\\|")[1];
+ final ValueJoiner<String, String, String> joiner = (value1, value2) ->
"(" + value1 + "," + value2 + ")";
+
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized;
+ if (queriable) {
+ materialized = Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as(queryableStoreName).withValueSerde(Serdes.String());
+ } else {
+ materialized = Materialized.with(null, Serdes.String());
+ }
+
+ final KTable<String, String> joinResult;
+ if (this.materialized) {
+ joinResult = left.join(
+ right,
+ extractor,
+ joiner,
+ materialized
+ );
+ } else {
+ joinResult = left.join(
+ right,
+ extractor,
+ joiner
+ );
+ }
+
+ joinResult
+ .toStream()
+ .to(OUTPUT);
+
+ return builder.build(streamsConfig);
+ }
+}