This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b154f58ce8b KAFKA-12829: Remove deprecated Topology#addGlobalStore of 
old Processor API (#16791)
b154f58ce8b is described below

commit b154f58ce8b6b0dfb8122d346213521600a18ce5
Author: JohnHuang <[email protected]>
AuthorDate: Fri Aug 30 07:52:52 2024 +0800

    KAFKA-12829: Remove deprecated Topology#addGlobalStore of old Processor API 
(#16791)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../java/org/apache/kafka/streams/Topology.java    | 104 ---------------------
 .../processor/internals/ProcessorTopologyTest.java |  51 +---------
 2 files changed, 1 insertion(+), 154 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 0a1ac5c54e8..6b45d70a35c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -820,110 +820,6 @@ public class Topology {
         );
     }
 
-    /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * The supplier should always generate a new instance each time
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets 
called. Creating a single
-     * {@link org.apache.kafka.streams.processor.Processor} object and 
returning the same object reference in
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} 
would be a violation of the supplier pattern
-     * and leads to runtime exceptions.
-     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param storeBuilder          user defined state store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, 
String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
-     */
-    @Deprecated
-    public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> 
storeBuilder,
-                                                       final String sourceName,
-                                                       final Deserializer<K> 
keyDeserializer,
-                                                       final Deserializer<V> 
valueDeserializer,
-                                                       final String topic,
-                                                       final String 
processorName,
-                                                       final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) 
{
-        internalTopologyBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
-            sourceName,
-            null,
-            keyDeserializer,
-            valueDeserializer,
-            topic,
-            processorName,
-            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
-            true
-        );
-        return this;
-    }
-
-    /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * The supplier should always generate a new instance each time
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets 
called. Creating a single
-     * {@link org.apache.kafka.streams.processor.Processor} object and 
returning the same object reference in
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} 
would be a violation of the supplier pattern
-     * and leads to runtime exceptions.
-     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
-     *
-     * @param storeBuilder          user defined key value store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
-     *                              if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, 
String, TimestampExtractor, Deserializer, Deserializer, String, String, 
ProcessorSupplier)} instead.
-     */
-    @Deprecated
-    public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> 
storeBuilder,
-                                                       final String sourceName,
-                                                       final 
TimestampExtractor timestampExtractor,
-                                                       final Deserializer<K> 
keyDeserializer,
-                                                       final Deserializer<V> 
valueDeserializer,
-                                                       final String topic,
-                                                       final String 
processorName,
-                                                       final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) 
{
-        internalTopologyBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
-            sourceName,
-            timestampExtractor,
-            keyDeserializer,
-            valueDeserializer,
-            topic,
-            processorName,
-            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
-            true
-        );
-        return this;
-    }
-
     /**
      * Adds a global {@link StateStore} to the topology.
      * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 17d125c2c6b..0c39aff9b68 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -1268,35 +1268,6 @@ public class ProcessorTopologyTest {
 
     }
 
-    @Deprecated // testing old PAPI
-    @Test
-    public void shouldDriveGlobalStore() {
-        final String storeName = "my-store";
-        final String global = "global";
-        final String topic = "topic";
-
-        topology.addGlobalStore(
-            Stores.keyValueStoreBuilder(
-                Stores.inMemoryKeyValueStore(storeName),
-                Serdes.String(),
-                Serdes.String()
-            ).withLoggingDisabled(),
-            global,
-            STRING_DESERIALIZER,
-            STRING_DESERIALIZER,
-            topic,
-            "processor",
-            define(new OldAPIStatefulProcessor(storeName)));
-
-        driver = new TopologyTestDriver(topology, props);
-        final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(topic, STRING_SERIALIZER, STRING_SERIALIZER);
-        final KeyValueStore<String, String> globalStore = 
driver.getKeyValueStore(storeName);
-        inputTopic.pipeInput("key1", "value1");
-        inputTopic.pipeInput("key2", "value2");
-        assertEquals("value1", globalStore.get("key1"));
-        assertEquals("value2", globalStore.get("key2"));
-    }
-
     @Test
     public void testDrivingSimpleMultiSourceTopology() {
         final int partition = 10;
@@ -1512,18 +1483,6 @@ public class ProcessorTopologyTest {
         assertTrue(processorTopology.hasPersistentLocalStore());
     }
 
-    @Test
-    public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
-        final ProcessorTopology processorTopology = 
createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store"));
-        assertFalse(processorTopology.hasPersistentGlobalStore());
-    }
-
-    @Test
-    public void persistentGlobalStoreShouldBeDetected() {
-        final ProcessorTopology processorTopology = 
createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store"));
-        assertTrue(processorTopology.hasPersistentGlobalStore());
-    }
-
     private ProcessorTopology createLocalStoreTopology(final 
KeyValueBytesStoreSupplier storeSupplier) {
         final TopologyWrapper topology = new TopologyWrapper();
         final String processor = "processor";
@@ -1535,15 +1494,7 @@ public class ProcessorTopologyTest {
         return topology.getInternalBuilder("anyAppId").buildTopology();
     }
 
-    @Deprecated // testing old PAPI
-    private ProcessorTopology createGlobalStoreTopology(final 
KeyValueBytesStoreSupplier storeSupplier) {
-        final TopologyWrapper topology = new TopologyWrapper();
-        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-                Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String()).withLoggingDisabled();
-        topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, 
STRING_DESERIALIZER, "topic", "processor",
-                define(new OldAPIStatefulProcessor(storeSupplier.name())));
-        return topology.getInternalBuilder("anyAppId").buildTopology();
-    }
+
 
     private void assertNextOutputRecord(final TestRecord<String, String> 
record,
                                         final String key,

Reply via email to