This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0bc770d7b61 MINOR: avoid WARN logs in KafkaStreams test (#18517)
0bc770d7b61 is described below

commit 0bc770d7b61c95ef9a891e781abed40ff4ea4b7c
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Jan 15 14:08:55 2025 -0800

    MINOR: avoid WARN logs in KafkaStreams test (#18517)
    
    Avoiding 'WARN Method #getTimestampedKeyValueStore() should be
    used to access a TimestampedKeyValueStore.' running
    KTableKTableForeignKeyJoinIntegrationTest.
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 .../KTableKTableForeignKeyJoinIntegrationTest.java | 37 ++++++++++++----------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
index b4ac10fdbc6..4bd0f74da1d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -174,7 +175,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
             final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? 
driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new 
StringDeserializer()) : null;
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             // Pre-populate the RHS records. This test is all about what 
happens when we add/remove LHS records
             right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
@@ -257,7 +258,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
             // Now delete one LHS entity such that one delete is propagated 
down to the output.
 
-            left.pipeInput("lhs1", (String) null, baseTimestamp + 6);
+            left.pipeInput("lhs1", null, baseTimestamp + 6);
             assertThat(
                 outputTopic.readKeyValuesToMap(),
                 is(mkMap(
@@ -298,7 +299,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             // Pre-populate the LHS records. This test is all about what 
happens when we add/remove RHS records
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
@@ -381,7 +382,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
 
             // Now delete the RHS entity such that all matching keys have 
deletes propagated.
-            right.pipeInput("rhs1", (String) null, baseTimestamp + 6);
+            right.pipeInput("rhs1", null, baseTimestamp + 6);
 
             assertThat(
                 outputTopic.readKeyValuesToMap(),
@@ -417,7 +418,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
 
@@ -439,7 +440,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             // Deleting a non-joining record produces an unnecessary tombstone 
for inner joins, because
             // it's not possible to know whether a result was previously 
emitted.
             // For the left join, the tombstone is necessary.
-            left.pipeInput("lhs1", (String) null, baseTimestamp + 1);
+            left.pipeInput("lhs1", null, baseTimestamp + 1);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -454,7 +455,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             }
 
             // Deleting a non-existing record is idempotent
-            left.pipeInput("lhs1", (String) null, baseTimestamp + 2);
+            left.pipeInput("lhs1", null, baseTimestamp + 2);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -483,10 +484,10 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             // Deleting a record that never existed doesn't need to emit 
tombstones.
-            left.pipeInput("lhs1", (String) null, baseTimestamp);
+            left.pipeInput("lhs1", null, baseTimestamp);
             {
                 assertThat(
                     outputTopic.readKeyValuesToMap(),
@@ -516,7 +517,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
             // no output for a new inner join on a non-existent FK
@@ -623,7 +624,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             // Pre-populate the RHS records. This test is all about what 
happens when we change LHS records foreign key reference
             // then populate update on RHS
@@ -707,7 +708,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
             {
@@ -744,11 +745,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, streamsConfig)) {
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
             final String subscriptionStoreName = 
driver.getAllStateStores().entrySet().stream()
                 .filter(e -> e.getKey().contains("SUBSCRIPTION-STATE-STORE"))
                 .findAny().orElseThrow(() -> new RuntimeException("couldn't 
find store")).getKey();
-            final KeyValueStore<Bytes, ValueAndTimestamp<String>> 
subscriptionStore = driver.getKeyValueStore(subscriptionStoreName);
+            final KeyValueStore<Bytes, ValueAndTimestamp<String>> 
subscriptionStore = driver.getTimestampedKeyValueStore(subscriptionStoreName);
             final Bytes key = subscriptionStoreKey("lhs1", "rhs1");
             left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp);
             {
@@ -786,9 +787,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
         return key;
     }
 
-    protected static Map<String, String> asMap(final KeyValueStore<String, 
String> store) {
+    protected static Map<String, String> asMap(final KeyValueStore<String, 
ValueAndTimestamp<String>> store) {
         final HashMap<String, String> result = new HashMap<>();
-        store.all().forEachRemaining(kv -> result.put(kv.key, kv.value));
+        try (final KeyValueIterator<String, ValueAndTimestamp<String>> it = 
store.all()) {
+            it.forEachRemaining(kv -> result.put(kv.key, kv.value.value()));
+        }
         return result;
     }
 
@@ -921,7 +924,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
             final TestInputTopic<String, String> right = 
driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestInputTopic<String, String> left = 
driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new 
StringSerializer());
             final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT, new StringDeserializer(), new 
StringDeserializer());
-            final KeyValueStore<String, String> store = 
driver.getKeyValueStore("store");
+            final KeyValueStore<String, ValueAndTimestamp<String>> store = 
driver.getTimestampedKeyValueStore("store");
 
             // RHS record
             right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 4);

Reply via email to