This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit abf74de663e4646b40486b976e1d79639f9568c3 Author: Almog Gavra <almog.ga...@gmail.com> AuthorDate: Tue Dec 17 23:45:49 2024 -0800 KAFKA-18026: KIP-1112 migrate KTableSuppressProcessorSupplier (#18150) Migrates KTableSuppressProcessorSupplier to use the the ProcessorSupplier#stores() method Reviewers: Guozhang Wang <guozhang.wang...@gmail.com>, Anna Sophie Blee-Goldman <ableegold...@apache.org> --- .../streams/kstream/internals/KTableImpl.java | 15 ++++++------- .../kstream/internals/graph/TableSuppressNode.java | 6 ++---- .../suppress/KTableSuppressProcessorSupplier.java | 20 +++++++++++------ .../apache/kafka/streams/StreamsBuilderTest.java | 25 ++++++++++++++++++++++ .../KTableSuppressProcessorMetricsTest.java | 10 ++++++++- .../suppress/KTableSuppressProcessorTest.java | 10 ++++++++- 6 files changed, 66 insertions(+), 20 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 456b2fbb47f..5367b8dede2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -69,7 +69,6 @@ import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.InternalTopicProperties; import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; -import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -550,12 +549,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< final String storeName = suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); - final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>( - suppressedInternal, - storeName, - this - ); - final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> storeBuilder; if (suppressedInternal.bufferConfig().isLoggingEnabled()) { @@ -573,10 +566,16 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< .withLoggingDisabled(); } + final ProcessorSupplier<K, Change<V>, K, Change<V>> suppressionSupplier = new KTableSuppressProcessorSupplier<>( + suppressedInternal, + storeBuilder, + this + ); + final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - StoreBuilderWrapper.wrapStoreBuilder(storeBuilder) + new String[]{storeName} ); node.setOutputVersioned(false); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java index 88e55f37a25..595d0266aae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSuppressNode.java @@ -16,12 +16,10 @@ */ package org.apache.kafka.streams.kstream.internals.graph; -import org.apache.kafka.streams.processor.internals.StoreFactory; - public class TableSuppressNode<K, V> extends StatefulProcessorNode<K, V> { public TableSuppressNode(final String nodeName, final ProcessorParameters<K, V, ?, ?> processorParameters, - final StoreFactory materializedKTableStoreBuilder) { - super(nodeName, processorParameters, materializedKTableStoreBuilder); + final String[] storeNames) { + super(nodeName, processorParameters, storeNames); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java index 3f98d444bb2..eaf6b681e74 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java @@ -33,23 +33,26 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.SerdeGetter; import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.internals.Maybe; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; +import java.util.Set; + import static java.util.Objects.requireNonNull; public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSupplier<K, V, K, V> { private final SuppressedInternal<K> suppress; - private final String storeName; + private final StoreBuilder<?> storeBuilder; private final KTableImpl<K, ?, V> parentKTable; public KTableSuppressProcessorSupplier(final SuppressedInternal<K> suppress, - final String storeName, + final StoreBuilder<?> storeBuilder, final KTableImpl<K, ?, V> parentKTable) { this.suppress = suppress; - this.storeName = storeName; + this.storeBuilder = storeBuilder; this.parentKTable = parentKTable; // The suppress buffer requires seeing the old values, to support the prior value view. parentKTable.enableSendingOldValues(true); @@ -57,7 +60,12 @@ public class KTableSuppressProcessorSupplier<K, V> implements @Override public Processor<K, Change<V>, K, Change<V>> get() { - return new KTableSuppressProcessor<>(suppress, storeName); + return new KTableSuppressProcessor<>(suppress, storeBuilder.name()); + } + + @Override + public Set<StoreBuilder<?>> stores() { + return Set.of(storeBuilder); } @Override @@ -75,7 +83,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements public void init(final ProcessorContext<?, ?> context) { parentGetter.init(context); // the main processor is responsible for the buffer's lifecycle - buffer = requireNonNull(context.getStateStore(storeName)); + buffer = requireNonNull(context.getStateStore(storeBuilder.name())); } @Override @@ -107,7 +115,7 @@ public class KTableSuppressProcessorSupplier<K, V> implements final String[] parentStores = parentValueGetterSupplier.storeNames(); final String[] stores = new String[1 + parentStores.length]; System.arraycopy(parentStores, 0, stores, 1, parentStores.length); - stores[0] = storeName; + stores[0] = storeBuilder.name(); return stores; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 71905e1481c..08e413703c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.StreamJoined; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.processor.StateStore; @@ -1517,6 +1518,30 @@ public class StreamsBuilderTest { assertThat(counter.numConnectedStateStores(), CoreMatchers.is(1)); } + @Test + public void shouldWrapProcessorsForSuppress() { + final Map<Object, Object> props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + builder.stream("input", Consumed.as("source")) + .groupByKey() + .count(Named.as("count"))// wrapped 1 + .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(10), Suppressed.BufferConfig.unbounded()).withName("suppressed")) // wrapped 2 + .toStream(Named.as("toStream"))// wrapped 3 + .to("output", Produced.as("sink")); + + builder.build(); + assertThat(counter.numWrappedProcessors(), CoreMatchers.is(3)); + assertThat(counter.wrappedProcessorNames(), Matchers.containsInAnyOrder("count", "toStream", "suppressed")); + assertThat(counter.numUniqueStateStores(), CoreMatchers.is(2)); + assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); + } + @Test public void shouldWrapProcessorsForTimeWindowStreamAggregate() { final Map<Object, Object> props = dummyStreamsConfigMap(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java index 4f94bac545f..728fdd72f83 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.test.MockInternalProcessorContext; import org.apache.kafka.test.StreamsTestUtils; @@ -37,6 +38,7 @@ import org.apache.kafka.test.TestUtils; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -143,7 +145,7 @@ public class KTableSuppressProcessorMetricsTest { final Processor<String, Change<Long>, String, Change<Long>> processor = new KTableSuppressProcessorSupplier<>( (SuppressedInternal<String>) Suppressed.<String>untilTimeLimit(Duration.ofDays(100), maxRecords(1)), - storeName, + mockBuilderWithName(storeName), mock ).get(); @@ -205,4 +207,10 @@ public class KTableSuppressProcessorMetricsTest { assertThat(metrics.get(metricName).metricName().description(), is(metricName.description())); assertThat((T) metrics.get(metricName).metricValue(), matcher); } + + private StoreBuilder<?> mockBuilderWithName(final String name) { + final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class); + Mockito.when(builder.name()).thenReturn(name); + return builder; + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 28f9d24cf5a..dc8dd5dc358 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.api.MockProcessorContext; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; import org.apache.kafka.test.MockInternalProcessorContext; @@ -42,6 +43,7 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -91,7 +93,7 @@ public class KTableSuppressProcessorTest { @SuppressWarnings("unchecked") final KTableImpl<K, ?, V> parent = mock(KTableImpl.class); final Processor<K, Change<V>, K, Change<V>> processor = - new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, storeName, parent).get(); + new KTableSuppressProcessorSupplier<>((SuppressedInternal<K>) suppressed, mockBuilderWithName(storeName), parent).get(); final MockInternalProcessorContext<K, Change<V>> context = new MockInternalProcessorContext<>(); context.setCurrentNode(new ProcessorNode<>("testNode")); @@ -487,4 +489,10 @@ public class KTableSuppressProcessorTest { new TimeWindowedDeserializer<>(kSerde.deserializer(), windowSize) ); } + + private static StoreBuilder<?> mockBuilderWithName(final String name) { + final StoreBuilder<?> builder = Mockito.mock(StoreBuilder.class); + Mockito.when(builder.name()).thenReturn(name); + return builder; + } }