This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6224feee656 MINOR: update StandbyTaskCreationIntegrationTest (#16700)
6224feee656 is described below
commit 6224feee656303d66467647f4a71f0a3daa3b2b8
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jul 31 11:27:38 2024 -0700
MINOR: update StandbyTaskCreationIntegrationTest (#16700)
Refactor test to move off deprecated `transform()` in favor of
`process()`.
Reviewers: Bill Bejeck <[email protected]>
---
.../StandbyTaskCreationIntegrationTest.java | 35 +++++++++-------------
1 file changed, 14 insertions(+), 21 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index d2002f1a76e..aa08da6da26 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
@@ -27,8 +26,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -87,20 +86,19 @@ public class StandbyTaskCreationIntegrationTest {
client2.close();
}
- private Properties streamsConfiguration(final TestInfo testInfo) {
+ private Properties streamsConfiguration() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
return streamsConfiguration;
}
@Test
- @SuppressWarnings("deprecation")
- public void
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final TestInfo
testInfo) throws Exception {
+ public void
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws
Exception {
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>>
keyValueStoreBuilder =
@@ -109,21 +107,16 @@ public class StandbyTaskCreationIntegrationTest {
Serdes.Integer()).withLoggingDisabled();
builder.addStateStore(keyValueStoreBuilder);
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(),
Serdes.Integer()))
- .transform(() -> new Transformer<Integer, Integer,
KeyValue<Integer, Integer>>() {
+ // don't use method reference below as it won't create a new
`Processor` instance but re-use the same object
+ .process(() -> new Processor<Integer, Integer, Object, Object>() {
@Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public KeyValue<Integer, Integer> transform(final Integer key,
final Integer value) {
- return null;
+ public void process(final Record<Integer, Integer> record) {
+ // no-op
}
-
- @Override
- public void close() {}
}, stateStoreName);
final Topology topology = builder.build();
- createClients(topology, streamsConfiguration(testInfo), topology,
streamsConfiguration(testInfo));
+ createClients(topology, streamsConfiguration(), topology,
streamsConfiguration());
setStateListenersForVerification(thread ->
thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
@@ -135,10 +128,10 @@ public class StandbyTaskCreationIntegrationTest {
}
@Test
- public void
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final TestInfo
testInfo) throws Exception {
- final Properties streamsConfiguration1 =
streamsConfiguration(testInfo);
+ public void
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws
Exception {
+ final Properties streamsConfiguration1 = streamsConfiguration();
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- final Properties streamsConfiguration2 =
streamsConfiguration(testInfo);
+ final Properties streamsConfiguration2 = streamsConfiguration();
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
final StreamsBuilder builder = new StreamsBuilder();