This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b154f58ce8b KAFKA-12829: Remove deprecated Topology#addGlobalStore of
old Processor API (#16791)
b154f58ce8b is described below
commit b154f58ce8b6b0dfb8122d346213521600a18ce5
Author: JohnHuang <[email protected]>
AuthorDate: Fri Aug 30 07:52:52 2024 +0800
KAFKA-12829: Remove deprecated Topology#addGlobalStore of old Processor API
(#16791)
Reviewers: Matthias J. Sax <[email protected]>
---
.../java/org/apache/kafka/streams/Topology.java | 104 ---------------------
.../processor/internals/ProcessorTopologyTest.java | 51 +---------
2 files changed, 1 insertion(+), 154 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 0a1ac5c54e8..6b45d70a35c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -820,110 +820,6 @@ public class Topology {
);
}
- /**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the
provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
- * of the input topic.
- * <p>
- * The provided {@link
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an
{@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * The supplier should always generate a new instance each time
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets
called. Creating a single
- * {@link org.apache.kafka.streams.processor.Processor} object and
returning the same object reference in
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()}
would be a violation of the supplier pattern
- * and leads to runtime exceptions.
- * This {@link ProcessorNode} should be used to keep the {@link
StateStore} up-to-date.
- * The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
- *
- * @param storeBuilder user defined state store builder
- * @param sourceName name of the {@link SourceNode} that will
be automatically added
- * @param keyDeserializer the {@link Deserializer} to deserialize
keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize
values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @param stateUpdateSupplier the instance of {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @return itself
- * @throws TopologyException if the processor of state is already
registered
- * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder,
String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
- */
- @Deprecated
- public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?>
storeBuilder,
- final String sourceName,
- final Deserializer<K>
keyDeserializer,
- final Deserializer<V>
valueDeserializer,
- final String topic,
- final String
processorName,
- final
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier)
{
- internalTopologyBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
- sourceName,
- null,
- keyDeserializer,
- valueDeserializer,
- topic,
- processorName,
- () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
- true
- );
- return this;
- }
-
- /**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the
provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
- * of the input topic.
- * <p>
- * The provided {@link
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an
{@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * The supplier should always generate a new instance each time
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets
called. Creating a single
- * {@link org.apache.kafka.streams.processor.Processor} object and
returning the same object reference in
- * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()}
would be a violation of the supplier pattern
- * and leads to runtime exceptions.
- * This {@link ProcessorNode} should be used to keep the {@link
StateStore} up-to-date.
- *
- * @param storeBuilder user defined key value store builder
- * @param sourceName name of the {@link SourceNode} that will
be automatically added
- * @param timestampExtractor the stateless timestamp extractor used for
this source,
- * if not specified the default extractor
defined in the configs will be used
- * @param keyDeserializer the {@link Deserializer} to deserialize
keys with
- * @param valueDeserializer the {@link Deserializer} to deserialize
values with
- * @param topic the topic to source the data from
- * @param processorName the name of the {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @param stateUpdateSupplier the instance of {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @return itself
- * @throws TopologyException if the processor of state is already
registered
- * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder,
String, TimestampExtractor, Deserializer, Deserializer, String, String,
ProcessorSupplier)} instead.
- */
- @Deprecated
- public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?>
storeBuilder,
- final String sourceName,
- final
TimestampExtractor timestampExtractor,
- final Deserializer<K>
keyDeserializer,
- final Deserializer<V>
valueDeserializer,
- final String topic,
- final String
processorName,
- final
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier)
{
- internalTopologyBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
- sourceName,
- timestampExtractor,
- keyDeserializer,
- valueDeserializer,
- topic,
- processorName,
- () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
- true
- );
- return this;
- }
-
/**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the
provided input topic.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 17d125c2c6b..0c39aff9b68 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -1268,35 +1268,6 @@ public class ProcessorTopologyTest {
}
- @Deprecated // testing old PAPI
- @Test
- public void shouldDriveGlobalStore() {
- final String storeName = "my-store";
- final String global = "global";
- final String topic = "topic";
-
- topology.addGlobalStore(
- Stores.keyValueStoreBuilder(
- Stores.inMemoryKeyValueStore(storeName),
- Serdes.String(),
- Serdes.String()
- ).withLoggingDisabled(),
- global,
- STRING_DESERIALIZER,
- STRING_DESERIALIZER,
- topic,
- "processor",
- define(new OldAPIStatefulProcessor(storeName)));
-
- driver = new TopologyTestDriver(topology, props);
- final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic, STRING_SERIALIZER, STRING_SERIALIZER);
- final KeyValueStore<String, String> globalStore =
driver.getKeyValueStore(storeName);
- inputTopic.pipeInput("key1", "value1");
- inputTopic.pipeInput("key2", "value2");
- assertEquals("value1", globalStore.get("key1"));
- assertEquals("value2", globalStore.get("key2"));
- }
-
@Test
public void testDrivingSimpleMultiSourceTopology() {
final int partition = 10;
@@ -1512,18 +1483,6 @@ public class ProcessorTopologyTest {
assertTrue(processorTopology.hasPersistentLocalStore());
}
- @Test
- public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
- final ProcessorTopology processorTopology =
createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
- assertFalse(processorTopology.hasPersistentGlobalStore());
- }
-
- @Test
- public void persistentGlobalStoreShouldBeDetected() {
- final ProcessorTopology processorTopology =
createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store"));
- assertTrue(processorTopology.hasPersistentGlobalStore());
- }
-
private ProcessorTopology createLocalStoreTopology(final
KeyValueBytesStoreSupplier storeSupplier) {
final TopologyWrapper topology = new TopologyWrapper();
final String processor = "processor";
@@ -1535,15 +1494,7 @@ public class ProcessorTopologyTest {
return topology.getInternalBuilder("anyAppId").buildTopology();
}
- @Deprecated // testing old PAPI
- private ProcessorTopology createGlobalStoreTopology(final
KeyValueBytesStoreSupplier storeSupplier) {
- final TopologyWrapper topology = new TopologyWrapper();
- final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
- Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(),
Serdes.String()).withLoggingDisabled();
- topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER,
STRING_DESERIALIZER, "topic", "processor",
- define(new OldAPIStatefulProcessor(storeSupplier.name())));
- return topology.getInternalBuilder("anyAppId").buildTopology();
- }
+
private void assertNextOutputRecord(final TestRecord<String, String>
record,
final String key,