This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88e2d6b8c23 KAFKA-14834: [3/N] Timestamped lookups for stream-table 
joins (#13509)
88e2d6b8c23 is described below

commit 88e2d6b8c23c548ebd4146b447c80fcda85f5102
Author: Victoria Xia <[email protected]>
AuthorDate: Wed Apr 12 19:54:15 2023 -0400

    KAFKA-14834: [3/N] Timestamped lookups for stream-table joins (#13509)
    
    This PR updates the stream-table join processors, including both 
KStream-KTable and KStream-GlobalKTable joins, to perform timestamped lookups 
when the (global) table is versioned, as specified in KIP-914.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../internals/KStreamKTableJoinProcessor.java      |  6 +-
 .../StreamTableJoinIntegrationTest.java            | 83 +++++++++++++++++++-
 .../internals/KStreamGlobalKTableJoinTest.java     | 78 ++++++++++++++-----
 .../internals/KStreamGlobalKTableLeftJoinTest.java | 89 ++++++++++++++++------
 4 files changed, 212 insertions(+), 44 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 1d4a012a9fb..53e246167d9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +84,10 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
             }
             droppedRecordsSensor.record();
         } else {
-            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
+            final ValueAndTimestamp<V2> valueAndTimestamp2 = 
valueGetter.isVersioned()
+                ? valueGetter.get(mappedKey, record.timestamp())
+                : valueGetter.get(mappedKey);
+            final V2 value2 = getValueOrNull(valueAndTimestamp2);
             if (leftJoin || value2 != null) {
                 context().forward(record.withValue(joiner.apply(record.key(), 
record.value(), value2)));
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 144c18a70fb..bd95f57b15a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.integration;
 
+import java.time.Duration;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.test.TestRecord;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.Before;
@@ -40,6 +43,9 @@ import java.util.List;
 @Category({IntegrationTest.class})
 @RunWith(value = Parameterized.class)
 public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
+
+    private static final String STORE_NAME = "table-store";
+
     @Rule
     public Timeout globalTimeout = Timeout.seconds(600);
     private KStream<Long, String> leftStream;
@@ -56,14 +62,16 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
         appID = "stream-table-join-integration-test";
 
         builder = new StreamsBuilder();
-        rightTable = builder.table(INPUT_TOPIC_RIGHT);
-        leftStream = builder.stream(INPUT_TOPIC_LEFT);
     }
 
     @Test
     public void testInner() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
 
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT);
+        leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -86,7 +94,6 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", 
null,  8L))
         );
 
-        leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
         runTestWithDriver(expectedResult);
     }
 
@@ -94,6 +101,10 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
     public void testLeft() {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
 
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT);
+        leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
         final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
             null,
             null,
@@ -116,8 +127,74 @@ public class StreamTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest
             Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", 
null,  8L))
         );
 
+        runTestWithDriver(expectedResult);
+    }
+
+    @Test
+    public void testInnerWithVersionedStore() {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
+
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+            Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
+        leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
+
+        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null,  15L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a", 
null,  4L)),
+            null,
+            null,
+            null
+        );
+
+        runTestWithDriver(expectedResult);
+    }
+
+    @Test
+    public void testLeftWithVersionedStore() {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
+
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+            Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
         leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
 
+        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"A-null", null, 3L)),
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"C-null", null, 9L)),
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", 
null, 15L)),
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-a", 
null,  4L)),
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"F-null", null,  8L))
+        );
+
         runTestWithDriver(expectedResult);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index 8c65656bd6c..49068f99c52 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Optional;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,6 +29,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -49,6 +52,8 @@ public class KStreamGlobalKTableJoinTest {
 
     private final String streamTopic = "streamTopic";
     private final String globalTableTopic = "globalTableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<String, String> inputTableTopic;
     private final int[] expectedKeys = {0, 1, 2, 3};
 
     private TopologyTestDriver driver;
@@ -57,7 +62,15 @@ public class KStreamGlobalKTableJoinTest {
 
     @Before
     public void setUp() {
+        // use un-versioned store by default
+        init(Optional.empty());
+    }
+
+    private void initWithVersionedStore(final long historyRetentionMs) {
+        init(Optional.of(historyRetentionMs));
+    }
 
+    private void init(final Optional<Long> versionedStoreHistoryRetentionMs) {
         builder = new StreamsBuilder();
         final KStream<Integer, String> stream;
         final GlobalKTable<String, String> table; // value of stream 
optionally contains key of table
@@ -67,7 +80,12 @@ public class KStreamGlobalKTableJoinTest {
         final Consumed<Integer, String> streamConsumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
         final Consumed<String, String> tableConsumed = 
Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
-        table = builder.globalTable(globalTableTopic, tableConsumed);
+        if (versionedStoreHistoryRetentionMs.isPresent()) {
+            table = builder.globalTable(globalTableTopic, tableConsumed, 
Materialized.as(
+                Stores.persistentVersionedKeyValueStore("table", 
Duration.ofMillis(versionedStoreHistoryRetentionMs.get()))));
+        } else {
+            table = builder.globalTable(globalTableTopic, tableConsumed);
+        }
         keyMapper = (key, value) -> {
             final String[] tokens = value.split(",");
             // Value is comma delimited. If second token is present, it's the 
key to the global ktable.
@@ -80,6 +98,10 @@ public class KStreamGlobalKTableJoinTest {
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
+
+        // auto-advance stream timestamps by default, but not global table 
timestamps
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ofMillis(1L));
+        inputTableTopic = driver.createInputTopic(globalTableTopic, new 
StringSerializer(), new StringSerializer());
     }
 
     @After
@@ -88,8 +110,6 @@ public class KStreamGlobalKTableJoinTest {
     }
 
     private void pushToStream(final int messageCount, final String 
valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
-        final TestInputTopic<Integer, String> inputTopic =
-            driver.createInputTopic(streamTopic, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
         for (int i = 0; i < messageCount; i++) {
             String value = valuePrefix + expectedKeys[i];
             if (includeForeignKey) {
@@ -99,23 +119,19 @@ public class KStreamGlobalKTableJoinTest {
             if (includeNullKey && i == 0) {
                 key = null;
             }
-            inputTopic.pipeInput(key, value);
+            inputStreamTopic.pipeInput(key, value);
         }
     }
 
     private void pushToGlobalTable(final int messageCount, final String 
valuePrefix) {
-        final TestInputTopic<String, String> inputTopic =
-            driver.createInputTopic(globalTableTopic, new StringSerializer(), 
new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            inputTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + 
expectedKeys[i]);
+            inputTableTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + 
expectedKeys[i]);
         }
     }
 
     private void pushNullValueToGlobalTable(final int messageCount) {
-        final TestInputTopic<String, String> inputTopic =
-            driver.createInputTopic(globalTableTopic, new StringSerializer(), 
new StringSerializer());
         for (int i = 0; i < messageCount; i++) {
-            inputTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
+            inputTableTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
         }
     }
 
@@ -152,8 +168,8 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 2),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -163,10 +179,10 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+YY0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
-                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
-                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+YY0", 6),
+                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -214,8 +230,8 @@ public class KStreamGlobalKTableJoinTest {
         // push all four items to the primary stream. this should produce two 
items.
 
         pushToStream(4, "XX", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, 
"XX2,FKey2+Y2", 2),
-                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, 
"XX2,FKey2+Y2", 6),
+                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
     }
 
     @Test
@@ -246,4 +262,30 @@ public class KStreamGlobalKTableJoinTest {
         processor.checkAndClearProcessResult(new KeyValueTimestamp<>(null, 
"X0,FKey0+Y0", 0),
                 new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1));
     }
+
+    @Test
+    public void shouldPerformTimestampedGet() {
+        initWithVersionedStore(1000);
+
+        // do not auto-advance stream timestamps for this test
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer());
+
+        // produce out-of-order records including nulls to table
+        inputTableTopic.pipeInput("FKey1", "ValueT10", 10);
+        inputTableTopic.pipeInput("FKey1", "ValueT5", 5);
+        inputTableTopic.pipeInput("FKey1", null, 7);
+        inputTableTopic.pipeInput("FKey1", "ValueT12", 12);
+
+        // produce records to stream side
+        inputStreamTopic.pipeInput(1, "ValueS8,FKey1", 8);
+        inputStreamTopic.pipeInput(2, "ValueS12,FKey1", 12);
+        inputStreamTopic.pipeInput(3, "ValueS6,FKey1", 6);
+        inputStreamTopic.pipeInput(4, "ValueS10,FKey1", 10);
+        inputStreamTopic.pipeInput(5, "ValueS2,FKey1", 2);
+
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(2, "ValueS12,FKey1+ValueT12", 12),
+            new KeyValueTimestamp<>(3, "ValueS6,FKey1+ValueT5", 6),
+            new KeyValueTimestamp<>(4, "ValueS10,FKey1+ValueT10", 10));
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index fe6c1d08d9f..08fcb338e54 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Optional;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -28,6 +29,8 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -49,6 +52,8 @@ public class KStreamGlobalKTableLeftJoinTest {
 
     private final String streamTopic = "streamTopic";
     private final String globalTableTopic = "globalTableTopic";
+    private TestInputTopic<Integer, String> inputStreamTopic;
+    private TestInputTopic<String, String> inputTableTopic;
     private final int[] expectedKeys = {0, 1, 2, 3};
 
     private MockApiProcessor<Integer, String, Void, Void> processor;
@@ -57,7 +62,15 @@ public class KStreamGlobalKTableLeftJoinTest {
 
     @Before
     public void setUp() {
+        // use un-versioned store by default
+        init(Optional.empty());
+    }
+
+    private void initWithVersionedStore(final long historyRetentionMs) {
+        init(Optional.of(historyRetentionMs));
+    }
 
+    private void init(final Optional<Long> versionedStoreHistoryRetentionMs) {
         builder = new StreamsBuilder();
         final KStream<Integer, String> stream;
         final GlobalKTable<String, String> table; // value of stream 
optionally contains key of table
@@ -67,7 +80,12 @@ public class KStreamGlobalKTableLeftJoinTest {
         final Consumed<Integer, String> streamConsumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
         final Consumed<String, String> tableConsumed = 
Consumed.with(Serdes.String(), Serdes.String());
         stream = builder.stream(streamTopic, streamConsumed);
-        table = builder.globalTable(globalTableTopic, tableConsumed);
+        if (versionedStoreHistoryRetentionMs.isPresent()) {
+            table = builder.globalTable(globalTableTopic, tableConsumed, 
Materialized.as(
+                Stores.persistentVersionedKeyValueStore("table", 
Duration.ofMillis(versionedStoreHistoryRetentionMs.get()))));
+        } else {
+            table = builder.globalTable(globalTableTopic, tableConsumed);
+        }
         keyMapper = (key, value) -> {
             final String[] tokens = value.split(",");
             // Value is comma delimited. If second token is present, it's the 
key to the global ktable.
@@ -80,6 +98,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         driver = new TopologyTestDriver(builder.build(), props);
 
         processor = supplier.theCapturedProcessor();
+
+        // auto-advance timestamps by default
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ofMillis(1L));
+        inputTableTopic = driver.createInputTopic(globalTableTopic, new 
StringSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ofMillis(1L));
     }
 
     @After
@@ -88,8 +110,6 @@ public class KStreamGlobalKTableLeftJoinTest {
     }
 
     private void pushToStream(final int messageCount, final String 
valuePrefix, final boolean includeForeignKey, final boolean includeNullKey) {
-        final TestInputTopic<Integer, String> inputTopic =
-            driver.createInputTopic(streamTopic, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
         for (int i = 0; i < messageCount; i++) {
             String value = valuePrefix + expectedKeys[i];
             if (includeForeignKey) {
@@ -99,23 +119,19 @@ public class KStreamGlobalKTableLeftJoinTest {
             if (includeNullKey && i == 0) {
                 key = null;
             }
-            inputTopic.pipeInput(key, value);
+            inputStreamTopic.pipeInput(key, value);
         }
     }
 
     private void pushToGlobalTable(final int messageCount, final String 
valuePrefix) {
-        final TestInputTopic<String, String> inputTopic =
-            driver.createInputTopic(globalTableTopic, new StringSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
         for (int i = 0; i < messageCount; i++) {
-            inputTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + 
expectedKeys[i]);
+            inputTableTopic.pipeInput("FKey" + expectedKeys[i], valuePrefix + 
expectedKeys[i]);
         }
     }
 
     private void pushNullValueToGlobalTable(final int messageCount) {
-        final TestInputTopic<String, String> inputTopic =
-            driver.createInputTopic(globalTableTopic, new StringSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ofMillis(1L));
         for (int i = 0; i < messageCount; i++) {
-            inputTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
+            inputTableTopic.pipeInput("FKey" + expectedKeys[i], (String) null);
         }
     }
 
@@ -154,10 +170,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 1),
-                new KeyValueTimestamp<>(2, "X2,FKey2+null", 2),
-                new KeyValueTimestamp<>(3, "X3,FKey3+null", 3));
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 2),
+                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3),
+                new KeyValueTimestamp<>(2, "X2,FKey2+null", 4),
+                new KeyValueTimestamp<>(3, "X3,FKey3+null", 5));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -167,10 +183,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+YY0", 0),
-                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 1),
-                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 2),
-                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 3));
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+YY0", 6),
+                new KeyValueTimestamp<>(1, "X1,FKey1+YY1", 7),
+                new KeyValueTimestamp<>(2, "X2,FKey2+YY2", 8),
+                new KeyValueTimestamp<>(3, "X3,FKey3+YY3", 9));
 
         // push all items to the globalTable. this should not produce any item
 
@@ -220,10 +236,10 @@ public class KStreamGlobalKTableLeftJoinTest {
         // push all four items to the primary stream. this should produce four 
items.
 
         pushToStream(4, "XX", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"XX0,FKey0+null", 0),
-                new KeyValueTimestamp<>(1, "XX1,FKey1+null", 1),
-                new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 2),
-                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 3));
+        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"XX0,FKey0+null", 4),
+                new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5),
+                new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
+                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
     }
 
     @Test
@@ -256,4 +272,33 @@ public class KStreamGlobalKTableLeftJoinTest {
                 new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 2),
                 new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 3));
     }
+
+    @Test
+    public void shouldPerformTimestampedGet() {
+        initWithVersionedStore(1000);
+
+        // do not auto-advance timestamps for this test
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer());
+        inputTableTopic = driver.createInputTopic(globalTableTopic, new 
StringSerializer(), new StringSerializer());
+
+        // produce out-of-order records including nulls to table
+        inputTableTopic.pipeInput("FKey1", "ValueT10", 10);
+        inputTableTopic.pipeInput("FKey1", "ValueT5", 5);
+        inputTableTopic.pipeInput("FKey1", null, 7);
+        inputTableTopic.pipeInput("FKey1", "ValueT12", 12);
+
+        // produce records to stream side
+        inputStreamTopic.pipeInput(1, "ValueS8,FKey1", 8);
+        inputStreamTopic.pipeInput(2, "ValueS12,FKey1", 12);
+        inputStreamTopic.pipeInput(3, "ValueS6,FKey1", 6);
+        inputStreamTopic.pipeInput(4, "ValueS10,FKey1", 10);
+        inputStreamTopic.pipeInput(5, "ValueS2,FKey1", 2);
+
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(1, "ValueS8,FKey1+null", 8),
+            new KeyValueTimestamp<>(2, "ValueS12,FKey1+ValueT12", 12),
+            new KeyValueTimestamp<>(3, "ValueS6,FKey1+ValueT5", 6),
+            new KeyValueTimestamp<>(4, "ValueS10,FKey1+ValueT10", 10),
+            new KeyValueTimestamp<>(5, "ValueS2,FKey1+null", 2));
+    }
 }

Reply via email to