This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit abf74de663e4646b40486b976e1d79639f9568c3
Author: Almog Gavra <almog.ga...@gmail.com>
AuthorDate: Tue Dec 17 23:45:49 2024 -0800

    KAFKA-18026: KIP-1112 migrate KTableSuppressProcessorSupplier (#18150)
    
    Migrates KTableSuppressProcessorSupplier to use the the 
ProcessorSupplier#stores() method
    
    Reviewers: Guozhang Wang <guozhang.wang...@gmail.com>, Anna Sophie 
Blee-Goldman <ableegold...@apache.org>
---
 .../streams/kstream/internals/KTableImpl.java      | 15 ++++++-------
 .../kstream/internals/graph/TableSuppressNode.java |  6 ++----
 .../suppress/KTableSuppressProcessorSupplier.java  | 20 +++++++++++------
 .../apache/kafka/streams/StreamsBuilderTest.java   | 25 ++++++++++++++++++++++
 .../KTableSuppressProcessorMetricsTest.java        | 10 ++++++++-
 .../suppress/KTableSuppressProcessorTest.java      | 10 ++++++++-
 6 files changed, 66 insertions(+), 20 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 456b2fbb47f..5367b8dede2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -69,7 +69,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -550,12 +549,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
         final String storeName =
             suppressedInternal.name() != null ? suppressedInternal.name() + 
"-store" : builder.newStoreName(SUPPRESS_NAME);
 
-        final ProcessorSupplier<K, Change<V>, K, Change<V>> 
suppressionSupplier = new KTableSuppressProcessorSupplier<>(
-            suppressedInternal,
-            storeName,
-            this
-        );
-
         final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, 
Change<V>>> storeBuilder;
 
         if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
@@ -573,10 +566,16 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
                 .withLoggingDisabled();
         }
 
+        final ProcessorSupplier<K, Change<V>, K, Change<V>> 
suppressionSupplier = new KTableSuppressProcessorSupplier<>(
+                suppressedInternal,
+                storeBuilder,
+                this
+        );
+
         final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
             name,
             new ProcessorParameters<>(suppressionSupplier, name),
-            StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)
+            new String[]{storeName}
         );
         node.setOutputVersioned(false);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
index 88e55f37a25..595d0266aae 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java
@@ -16,12 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-
 public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> {
     public TableSuppressNode(final String nodeName,
                              final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                             final StoreFactory 
materializedKTableStoreBuilder) {
-        super(nodeName, processorParameters, materializedKTableStoreBuilder);
+                             final String[] storeNames) {
+        super(nodeName, processorParameters, storeNames);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index 3f98d444bb2..eaf6b681e74 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -33,23 +33,26 @@ import 
org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import 
org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Maybe;
 import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
 
+import java.util.Set;
+
 import static java.util.Objects.requireNonNull;
 
 public class KTableSuppressProcessorSupplier<K, V> implements
     KTableProcessorSupplier<K, V, K, V> {
     private final SuppressedInternal<K> suppress;
-    private final String storeName;
+    private final StoreBuilder<?> storeBuilder;
     private final KTableImpl<K, ?, V> parentKTable;
 
     public KTableSuppressProcessorSupplier(final SuppressedInternal<K> 
suppress,
-                                           final String storeName,
+                                           final StoreBuilder<?> storeBuilder,
                                            final KTableImpl<K, ?, V> 
parentKTable) {
         this.suppress = suppress;
-        this.storeName = storeName;
+        this.storeBuilder = storeBuilder;
         this.parentKTable = parentKTable;
         // The suppress buffer requires seeing the old values, to support the 
prior value view.
         parentKTable.enableSendingOldValues(true);
@@ -57,7 +60,12 @@ public class KTableSuppressProcessorSupplier<K, V> implements
 
     @Override
     public Processor<K, Change<V>, K, Change<V>> get() {
-        return new KTableSuppressProcessor<>(suppress, storeName);
+        return new KTableSuppressProcessor<>(suppress, storeBuilder.name());
+    }
+
+    @Override
+    public Set<StoreBuilder<?>> stores() {
+        return Set.of(storeBuilder);
     }
 
     @Override
@@ -75,7 +83,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements
                     public void init(final ProcessorContext<?, ?> context) {
                         parentGetter.init(context);
                         // the main processor is responsible for the buffer's 
lifecycle
-                        buffer = 
requireNonNull(context.getStateStore(storeName));
+                        buffer = 
requireNonNull(context.getStateStore(storeBuilder.name()));
                     }
 
                     @Override
@@ -107,7 +115,7 @@ public class KTableSuppressProcessorSupplier<K, V> 
implements
                 final String[] parentStores = 
parentValueGetterSupplier.storeNames();
                 final String[] stores = new String[1 + parentStores.length];
                 System.arraycopy(parentStores, 0, stores, 1, 
parentStores.length);
-                stores[0] = storeName;
+                stores[0] = storeBuilder.name();
                 return stores;
             }
         };
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 71905e1481c..08e413703c1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.StreamJoined;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TableJoined;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.StateStore;
@@ -1517,6 +1518,30 @@ public class StreamsBuilderTest {
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1));
     }
 
+    @Test
+    public void shouldWrapProcessorsForSuppress() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        builder.stream("input", Consumed.as("source"))
+                .groupByKey()
+                .count(Named.as("count"))// wrapped 1
+                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(10), 
Suppressed.BufferConfig.unbounded()).withName("suppressed")) // wrapped 2
+                .toStream(Named.as("toStream"))// wrapped 3
+                .to("output", Produced.as("sink"));
+
+        builder.build();
+        assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3));
+        assertThat(counter.wrappedProcessorNames(), 
Matchers.containsInAnyOrder("count", "toStream", "suppressed"));
+        assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2));
+        assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
+    }
+
     @Test
     public void shouldWrapProcessorsForTimeWindowStreamAggregate() {
         final Map<Object, Object> props = dummyStreamsConfigMap();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
index 4f94bac545f..728fdd72f83 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.StoreBuilder;
 import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -37,6 +38,7 @@ import org.apache.kafka.test.TestUtils;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
@@ -143,7 +145,7 @@ public class KTableSuppressProcessorMetricsTest {
         final Processor<String, Change<Long>, String, Change<Long>> processor =
             new KTableSuppressProcessorSupplier<>(
                 (SuppressedInternal<String>) 
Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)),
-                storeName,
+                mockBuilderWithName(storeName),
                 mock
             ).get();
 
@@ -205,4 +207,10 @@ public class KTableSuppressProcessorMetricsTest {
         assertThat(metrics.get(metricName).metricName().description(), 
is(metricName.description()));
         assertThat((T) metrics.get(metricName).metricValue(), matcher);
     }
+
+    private StoreBuilder<?> mockBuilderWithName(final String name) {
+        final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class);
+        Mockito.when(builder.name()).thenReturn(name);
+        return builder;
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 28f9d24cf5a..dc8dd5dc358 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.streams.processor.api.MockProcessorContext;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.state.StoreBuilder;
 import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
 import org.apache.kafka.test.MockInternalProcessorContext;
 
@@ -42,6 +43,7 @@ import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 import org.mockito.junit.jupiter.MockitoSettings;
 import org.mockito.quality.Strictness;
@@ -91,7 +93,7 @@ public class KTableSuppressProcessorTest {
             @SuppressWarnings("unchecked")
             final KTableImpl<K, ?, V> parent = mock(KTableImpl.class);
             final Processor<K, Change<V>, K, Change<V>> processor =
-                new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) 
suppressed, storeName, parent).get();
+                new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) 
suppressed, mockBuilderWithName(storeName), parent).get();
 
             final MockInternalProcessorContext<K, Change<V>> context = new 
MockInternalProcessorContext<>();
             context.setCurrentNode(new ProcessorNode<>("testNode"));
@@ -487,4 +489,10 @@ public class KTableSuppressProcessorTest {
             new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize)
         );
     }
+
+    private static StoreBuilder<?> mockBuilderWithName(final String name) {
+        final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class);
+        Mockito.when(builder.name()).thenReturn(name);
+        return builder;
+    }
 }

Reply via email to