This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88e2d6b8c23 KAFKA-14834: [3/N] Timestamped lookups for stream-table
joins (#13509)
88e2d6b8c23 is described below
commit 88e2d6b8c23c548ebd4146b447c80fcda85f5102
Author: Victoria Xia <[email protected]>
AuthorDate: Wed Apr 12 19:54:15 2023 -0400
KAFKA-14834: [3/N] Timestamped lookups for stream-table joins (#13509)
This PR updates the stream-table join processors, including both
KStream-KTable and KStream-GlobalKTable joins, to perform timestamped lookups
when the (global) table is versioned, as specified in KIP-914.
Reviewers: Matthias J. Sax <[email protected]>
---
.../internals/KStreamKTableJoinProcessor.java | 6 +-
.../StreamTableJoinIntegrationTest.java | 83 +++++++++++++++++++-
.../internals/KStreamGlobalKTableJoinTest.java | 78 ++++++++++++++-----
.../internals/KStreamGlobalKTableLeftJoinTest.java | 89 ++++++++++++++++------
4 files changed, 212 insertions(+), 44 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 1d4a012a9fb..53e246167d9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +84,10 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
}
droppedRecordsSensor.record();
} else {
- final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
+ final ValueAndTimestamp<V2> valueAndTimestamp2 =
valueGetter.isVersioned()
+ ? valueGetter.get(mappedKey, record.timestamp())
+ : valueGetter.get(mappedKey);
+ final V2 value2 = getValueOrNull(valueAndTimestamp2);
if (leftJoin || value2 != null) {
context().forward(record.withValue(joiner.apply(record.key(),
record.value(), value2)));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 144c18a70fb..bd95f57b15a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.kafka.streams.integration;
+import java.time.Duration;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.junit.Before;
@@ -40,6 +43,9 @@ import java.util.List;
@Category({IntegrationTest.class})
@RunWith(value = Parameterized.class)
public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest {
+
+ private static final String STORE_NAME = "table-store";
+
@Rule
public Timeout globalTimeout = Timeout.seconds(600);
private KStream<Long, String> leftStream;
@@ -56,14 +62,16 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
appID = "stream-table-join-integration-test";
builder = new StreamsBuilder();
- rightTable = builder.table(INPUT_TOPIC_RIGHT);
- leftStream = builder.stream(INPUT_TOPIC_LEFT);
}
@Test
public void testInner() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-inner");
+ leftStream = builder.stream(INPUT_TOPIC_LEFT);
+ rightTable = builder.table(INPUT_TOPIC_RIGHT);
+ leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -86,7 +94,6 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f",
null, 8L))
);
- leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
runTestWithDriver(expectedResult);
}
@@ -94,6 +101,10 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
public void testLeft() {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-left");
+ leftStream = builder.stream(INPUT_TOPIC_LEFT);
+ rightTable = builder.table(INPUT_TOPIC_RIGHT);
+ leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
null,
null,
@@ -116,8 +127,74 @@ public class StreamTableJoinIntegrationTest extends
AbstractJoinIntegrationTest
Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f",
null, 8L))
);
+ runTestWithDriver(expectedResult);
+ }
+
+ @Test
+ public void testInnerWithVersionedStore() {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-inner");
+
+ leftStream = builder.stream(INPUT_TOPIC_LEFT);
+ rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMinutes(5))));
+ leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+ final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d",
null, 15L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a",
null, 4L)),
+ null,
+ null,
+ null
+ );
+
+ runTestWithDriver(expectedResult);
+ }
+
+ @Test
+ public void testLeftWithVersionedStore() {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-left");
+
+ leftStream = builder.stream(INPUT_TOPIC_LEFT);
+ rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMinutes(5))));
leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+ final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"A-null", null, 3L)),
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"C-null", null, 9L)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d",
null, 15L)),
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a",
null, 4L)),
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"F-null", null, 8L))
+ );
+
runTestWithDriver(expectedResult);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 8c65656bd6c..49068f99c52 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import java.util.Optional;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,6 +29,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -49,6 +52,8 @@ public class KStreamGlobalKTableJoinTest {
private final String streamTopic = "streamTopic";
private final String globalTableTopic = "globalTableTopic";
+ private TestInputTopic<Integer, String> inputStreamTopic;
+ private TestInputTopic<String, String> inputTableTopic;
private final int[] expectedKeys = {0, 1, 2, 3};
private TopologyTestDriver driver;
@@ -57,7 +62,15 @@ public class KStreamGlobalKTableJoinTest {
@Before
public void setUp() {
+ // use un-versioned store by default
+ init(Optional.empty());
+ }
+
+ private void initWithVersionedStore(final long historyRetentionMs) {
+ init(Optional.of(historyRetentionMs));
+ }
+ private void init(final Optional<Long> versionedStoreHistoryRetentionMs) {
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
final GlobalKTable<String, String> table; // value of stream
optionally contains key of table
@@ -67,7 +80,12 @@ public class KStreamGlobalKTableJoinTest {
final Consumed<Integer, String> streamConsumed =
Consumed.with(Serdes.Integer(), Serdes.String());
final Consumed<String, String> tableConsumed =
Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
- table = builder.globalTable(globalTableTopic, tableConsumed);
+ if (versionedStoreHistoryRetentionMs.isPresent()) {
+ table = builder.globalTable(globalTableTopic, tableConsumed,
Materialized.as(
+ Stores.persistentVersionedKeyValueStore("table",
Duration.ofMillis(versionedStoreHistoryRetentionMs.get()))));
+ } else {
+ table = builder.globalTable(globalTableTopic, tableConsumed);
+ }
keyMapper = (key, value) -> {
final String[] tokens = value.split(",");
// Value is comma delimited. If second token is present, it's the
key to the global ktable.
@@ -80,6 +98,10 @@ public class KStreamGlobalKTableJoinTest {
driver = new TopologyTestDriver(builder.build(), props);
processor = supplier.theCapturedProcessor();
+
+ // auto-advance stream timestamps by default, but not global table
timestamps
+ inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ofMillis(1L));
+ inputTableTopic = driver.createInputTopic(globalTableTopic, new
StringSerializer(), new StringSerializer());
}
@After
@@ -88,8 +110,6 @@ public class KStreamGlobalKTableJoinTest {
}
private void pushToStream(final int messageCount, final String
valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
- final TestInputTopic<Integer, String> inputTopic =
- driver.createInputTopic(streamTopic, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
@@ -99,23 +119,19 @@ public class KStreamGlobalKTableJoinTest {
if (includeNullKey && i == 0) {
key = null;
}
- inputTopic.pipeInput(key, value);
+ inputStreamTopic.pipeInput(key, value);
}
}
private void pushToGlobalTable(final int messageCount, final String
valuePrefix) {
- final TestInputTopic<String, String> inputTopic =
- driver.createInputTopic(globalTableTopic, new StringSerializer(),
new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- inputTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix +
expectedKeys[i]);
+ inputTableTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix +
expectedKeys[i]);
}
}
private void pushNullValueToGlobalTable(final int messageCount) {
- final TestInputTopic<String, String> inputTopic =
- driver.createInputTopic(globalTableTopic, new StringSerializer(),
new StringSerializer());
for (int i = 0; i < messageCount; i++) {
- inputTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
+ inputTableTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
}
}
@@ -152,8 +168,8 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
+ processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 2),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3));
// push all items to the globalTable. this should not produce any item
@@ -163,10 +179,10 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+YY0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
- new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
- new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
+ processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+YY0", 6),
+ new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+ new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+ new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
// push all items to the globalTable. this should not produce any item
@@ -214,8 +230,8 @@ public class KStreamGlobalKTableJoinTest {
// push all four items to the primary stream. this should produce two
items.
pushToStream(4, "XX", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2,
"XX2,FKey2+Y2", 2),
- new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
+ processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2,
"XX2,FKey2+Y2", 6),
+ new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
}
@Test
@@ -246,4 +262,30 @@ public class KStreamGlobalKTableJoinTest {
processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null,
"X0,FKey0+Y0", 0),
new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
}
+
+ @Test
+ public void shouldPerformTimestampedGet() {
+ initWithVersionedStore(1000);
+
+ // do not auto-advance stream timestamps for this test
+ inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer());
+
+ // produce out-of-order records including nulls to table
+ inputTableTopic.pipeInput("FKey1", "ValueT10", 10);
+ inputTableTopic.pipeInput("FKey1", "ValueT5", 5);
+ inputTableTopic.pipeInput("FKey1", null, 7);
+ inputTableTopic.pipeInput("FKey1", "ValueT12", 12);
+
+ // produce records to stream side
+ inputStreamTopic.pipeInput(1, "ValueS8,FKey1", 8);
+ inputStreamTopic.pipeInput(2, "ValueS12,FKey1", 12);
+ inputStreamTopic.pipeInput(3, "ValueS6,FKey1", 6);
+ inputStreamTopic.pipeInput(4, "ValueS10,FKey1", 10);
+ inputStreamTopic.pipeInput(5, "ValueS2,FKey1", 2);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "ValueS12,FKey1+ValueT12", 12),
+ new KeyValueTimestamp<>(3, "ValueS6,FKey1+ValueT5", 6),
+ new KeyValueTimestamp<>(4, "ValueS10,FKey1+ValueT10", 10));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index fe6c1d08d9f..08fcb338e54 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import java.util.Optional;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,6 +29,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -49,6 +52,8 @@ public class KStreamGlobalKTableLeftJoinTest {
private final String streamTopic = "streamTopic";
private final String globalTableTopic = "globalTableTopic";
+ private TestInputTopic<Integer, String> inputStreamTopic;
+ private TestInputTopic<String, String> inputTableTopic;
private final int[] expectedKeys = {0, 1, 2, 3};
private MockApiProcessor<Integer, String, Void, Void> processor;
@@ -57,7 +62,15 @@ public class KStreamGlobalKTableLeftJoinTest {
@Before
public void setUp() {
+ // use un-versioned store by default
+ init(Optional.empty());
+ }
+
+ private void initWithVersionedStore(final long historyRetentionMs) {
+ init(Optional.of(historyRetentionMs));
+ }
+ private void init(final Optional<Long> versionedStoreHistoryRetentionMs) {
builder = new StreamsBuilder();
final KStream<Integer, String> stream;
final GlobalKTable<String, String> table; // value of stream
optionally contains key of table
@@ -67,7 +80,12 @@ public class KStreamGlobalKTableLeftJoinTest {
final Consumed<Integer, String> streamConsumed =
Consumed.with(Serdes.Integer(), Serdes.String());
final Consumed<String, String> tableConsumed =
Consumed.with(Serdes.String(), Serdes.String());
stream = builder.stream(streamTopic, streamConsumed);
- table = builder.globalTable(globalTableTopic, tableConsumed);
+ if (versionedStoreHistoryRetentionMs.isPresent()) {
+ table = builder.globalTable(globalTableTopic, tableConsumed,
Materialized.as(
+ Stores.persistentVersionedKeyValueStore("table",
Duration.ofMillis(versionedStoreHistoryRetentionMs.get()))));
+ } else {
+ table = builder.globalTable(globalTableTopic, tableConsumed);
+ }
keyMapper = (key, value) -> {
final String[] tokens = value.split(",");
// Value is comma delimited. If second token is present, it's the
key to the global ktable.
@@ -80,6 +98,10 @@ public class KStreamGlobalKTableLeftJoinTest {
driver = new TopologyTestDriver(builder.build(), props);
processor = supplier.theCapturedProcessor();
+
+ // auto-advance timestamps by default
+ inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ofMillis(1L));
+ inputTableTopic = driver.createInputTopic(globalTableTopic, new
StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ofMillis(1L));
}
@After
@@ -88,8 +110,6 @@ public class KStreamGlobalKTableLeftJoinTest {
}
private void pushToStream(final int messageCount, final String
valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
- final TestInputTopic<Integer, String> inputTopic =
- driver.createInputTopic(streamTopic, new IntegerSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
String value = valuePrefix + expectedKeys[i];
if (includeForeignKey) {
@@ -99,23 +119,19 @@ public class KStreamGlobalKTableLeftJoinTest {
if (includeNullKey && i == 0) {
key = null;
}
- inputTopic.pipeInput(key, value);
+ inputStreamTopic.pipeInput(key, value);
}
}
private void pushToGlobalTable(final int messageCount, final String
valuePrefix) {
- final TestInputTopic<String, String> inputTopic =
- driver.createInputTopic(globalTableTopic, new StringSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
- inputTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix +
expectedKeys[i]);
+ inputTableTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix +
expectedKeys[i]);
}
}
private void pushNullValueToGlobalTable(final int messageCount) {
- final TestInputTopic<String, String> inputTopic =
- driver.createInputTopic(globalTableTopic, new StringSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
for (int i = 0; i < messageCount; i++) {
- inputTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
+ inputTableTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
}
}
@@ -154,10 +170,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
- new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
- new KeyValueTimestamp<>(3, "X3,FKey3+null", 3));
+ processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+Y0", 2),
+ new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3),
+ new KeyValueTimestamp<>(2, "X2,FKey2+null", 4),
+ new KeyValueTimestamp<>(3, "X3,FKey3+null", 5));
// push all items to the globalTable. this should not produce any item
@@ -167,10 +183,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "X", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+YY0", 0),
- new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
- new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
- new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
+ processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"X0,FKey0+YY0", 6),
+ new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+ new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+ new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
// push all items to the globalTable. this should not produce any item
@@ -220,10 +236,10 @@ public class KStreamGlobalKTableLeftJoinTest {
// push all four items to the primary stream. this should produce four
items.
pushToStream(4, "XX", true, false);
- processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"XX0,FKey0+null", 0),
- new KeyValueTimestamp<>(1, "XX1,FKey1+null", 1),
- new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
- new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
+ processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0,
"XX0,FKey0+null", 4),
+ new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5),
+ new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
+ new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
}
@Test
@@ -256,4 +272,33 @@ public class KStreamGlobalKTableLeftJoinTest {
new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
}
+
+ @Test
+ public void shouldPerformTimestampedGet() {
+ initWithVersionedStore(1000);
+
+ // do not auto-advance timestamps for this test
+ inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer());
+ inputTableTopic = driver.createInputTopic(globalTableTopic, new
StringSerializer(), new StringSerializer());
+
+ // produce out-of-order records including nulls to table
+ inputTableTopic.pipeInput("FKey1", "ValueT10", 10);
+ inputTableTopic.pipeInput("FKey1", "ValueT5", 5);
+ inputTableTopic.pipeInput("FKey1", null, 7);
+ inputTableTopic.pipeInput("FKey1", "ValueT12", 12);
+
+ // produce records to stream side
+ inputStreamTopic.pipeInput(1, "ValueS8,FKey1", 8);
+ inputStreamTopic.pipeInput(2, "ValueS12,FKey1", 12);
+ inputStreamTopic.pipeInput(3, "ValueS6,FKey1", 6);
+ inputStreamTopic.pipeInput(4, "ValueS10,FKey1", 10);
+ inputStreamTopic.pipeInput(5, "ValueS2,FKey1", 2);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(1, "ValueS8,FKey1+null", 8),
+ new KeyValueTimestamp<>(2, "ValueS12,FKey1+ValueT12", 12),
+ new KeyValueTimestamp<>(3, "ValueS6,FKey1+ValueT5", 6),
+ new KeyValueTimestamp<>(4, "ValueS10,FKey1+ValueT10", 10),
+ new KeyValueTimestamp<>(5, "ValueS2,FKey1+null", 2));
+ }
}