This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0bc770d7b61 MINOR: avoid WARN logs in KafkaStreams test (#18517) 0bc770d7b61 is described below commit 0bc770d7b61c95ef9a891e781abed40ff4ea4b7c Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Wed Jan 15 14:08:55 2025 -0800 MINOR: avoid WARN logs in KafkaStreams test (#18517) Avoiding 'WARN Method #getTimestampedKeyValueStore() should be used to access a TimestampedKeyValueStore.' running KTableKTableForeignKeyJoinIntegrationTest. Reviewers: Bill Bejeck <b...@confluent.io> --- .../KTableKTableForeignKeyJoinIntegrationTest.java | 37 ++++++++++++---------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index b4ac10fdbc6..4bd0f74da1d 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -174,7 +175,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null; - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records right.pipeInput("rhs1", "rhsValue1", baseTimestamp); @@ -257,7 +258,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Now delete one LHS entity such that one delete is propagated down to the output. - left.pipeInput("lhs1", (String) null, baseTimestamp + 6); + left.pipeInput("lhs1", null, baseTimestamp + 6); assertThat( outputTopic.readKeyValuesToMap(), is(mkMap( @@ -298,7 +299,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); // Pre-populate the LHS records. This test is all about what happens when we add/remove RHS records left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); @@ -381,7 +382,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Now delete the RHS entity such that all matching keys have deletes propagated. - right.pipeInput("rhs1", (String) null, baseTimestamp + 6); + right.pipeInput("rhs1", null, baseTimestamp + 6); assertThat( outputTopic.readKeyValuesToMap(), @@ -417,7 +418,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); @@ -439,7 +440,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because // it's not possible to know whether a result was previously emitted. // For the left join, the tombstone is necessary. - left.pipeInput("lhs1", (String) null, baseTimestamp + 1); + left.pipeInput("lhs1", null, baseTimestamp + 1); { assertThat( outputTopic.readKeyValuesToMap(), @@ -454,7 +455,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // Deleting a non-existing record is idempotent - left.pipeInput("lhs1", (String) null, baseTimestamp + 2); + left.pipeInput("lhs1", null, baseTimestamp + 2); { assertThat( outputTopic.readKeyValuesToMap(), @@ -483,10 +484,10 @@ public class KTableKTableForeignKeyJoinIntegrationTest { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); // Deleting a record that never existed doesn't need to emit tombstones. - left.pipeInput("lhs1", (String) null, baseTimestamp); + left.pipeInput("lhs1", null, baseTimestamp); { assertThat( outputTopic.readKeyValuesToMap(), @@ -516,7 +517,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); // no output for a new inner join on a non-existent FK @@ -623,7 +624,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); // Pre-populate the RHS records. This test is all about what happens when we change LHS records foreign key reference // then populate update on RHS @@ -707,7 +708,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); { @@ -744,11 +745,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest { try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) { final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); final String subscriptionStoreName = driver.getAllStateStores().entrySet().stream() .filter(e -> e.getKey().contains("SUBSCRIPTION-STATE-STORE")) .findAny().orElseThrow(() -> new RuntimeException("couldn't find store")).getKey(); - final KeyValueStore<Bytes, ValueAndTimestamp<String>> subscriptionStore = driver.getKeyValueStore(subscriptionStoreName); + final KeyValueStore<Bytes, ValueAndTimestamp<String>> subscriptionStore = driver.getTimestampedKeyValueStore(subscriptionStoreName); final Bytes key = subscriptionStoreKey("lhs1", "rhs1"); left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp); { @@ -786,9 +787,11 @@ public class KTableKTableForeignKeyJoinIntegrationTest { return key; } - protected static Map<String, String> asMap(final KeyValueStore<String, String> store) { + protected static Map<String, String> asMap(final KeyValueStore<String, ValueAndTimestamp<String>> store) { final HashMap<String, String> result = new HashMap<>(); - store.all().forEachRemaining(kv -> result.put(kv.key, kv.value)); + try (final KeyValueIterator<String, ValueAndTimestamp<String>> it = store.all()) { + it.forEachRemaining(kv -> result.put(kv.key, kv.value.value())); + } return result; } @@ -921,7 +924,7 @@ public class KTableKTableForeignKeyJoinIntegrationTest { final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer()); final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer()); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer()); - final KeyValueStore<String, String> store = driver.getKeyValueStore("store"); + final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store"); // RHS record right.pipeInput("rhs1", "rhsValue1", baseTimestamp + 4);