This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b424553  KAFKA-13553: Add PAPI Window and Session store tests for IQv2 
(#11650)
b424553 is described below

commit b424553101c56547beafab2ae39f16671fc05c9e
Author: John Roesler <[email protected]>
AuthorDate: Wed Jan 5 23:16:33 2022 -0600

    KAFKA-13553: Add PAPI Window and Session store tests for IQv2 (#11650)
    
    During some recent reviews, @mjsax pointed out that StateStore layers
    are constructed differently the stores are added via the PAPI vs. the DSL.
    
    This PR adds PAPI construction for Window and Session stores to the
    IQv2StoreIntegrationTest so that we can ensure IQv2 works on every
    possible state store.
    
    Reviewer: Guozhang Wang <[email protected]>
---
 .../integration/IQv2StoreIntegrationTest.java      | 129 +++++++++++++++++++--
 1 file changed, 121 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 7629430..11f1b9e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
@@ -59,6 +60,7 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.StoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
@@ -67,7 +69,6 @@ import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.AssumptionViolatedException;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -452,11 +453,11 @@ public class IQv2StoreIntegrationTest {
         } else if (Objects.equals(kind, "DSL") && supplier instanceof 
WindowBytesStoreSupplier) {
             setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, 
builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof 
WindowBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, 
builder);
         } else if (Objects.equals(kind, "DSL") && supplier instanceof 
SessionBytesStoreSupplier) {
             setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, 
builder);
         } else if (Objects.equals(kind, "PAPI") && supplier instanceof 
SessionBytesStoreSupplier) {
-            throw new AssumptionViolatedException("Case not implemented yet");
+            setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, 
builder);
         } else {
             throw new AssertionError("Store supplier is an unrecognized 
type.");
         }
@@ -626,6 +627,118 @@ public class IQv2StoreIntegrationTest {
 
     }
 
+    private void setUpWindowPAPITopology(final WindowBytesStoreSupplier 
supplier,
+                                         final StreamsBuilder builder) {
+        final StoreBuilder<?> windowStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> 
processorSupplier;
+        if (storeToTest.timestamped()) {
+            windowStoreStoreBuilder = Stores.timestampedWindowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier = () -> new ContextualProcessor<Integer, 
Integer, Void, Void>() {
+                @Override
+                public void process(final Record<Integer, Integer> record) {
+                    final TimestampedWindowStore<Integer, Integer> stateStore =
+                        
context().getStateStore(windowStoreStoreBuilder.name());
+                    stateStore.put(
+                        record.key(),
+                        ValueAndTimestamp.make(
+                            record.value(), record.timestamp()
+                        ),
+                        WINDOW_START
+                    );
+                }
+            };
+        } else {
+            windowStoreStoreBuilder = Stores.windowStoreBuilder(
+                supplier,
+                Serdes.Integer(),
+                Serdes.Integer()
+            );
+            processorSupplier =
+                () -> new ContextualProcessor<Integer, Integer, Void, Void>() {
+                    @Override
+                    public void process(final Record<Integer, Integer> record) 
{
+                        final WindowStore<Integer, Integer> stateStore =
+                            
context().getStateStore(windowStoreStoreBuilder.name());
+                        stateStore.put(record.key(), record.value(), 
WINDOW_START);
+                    }
+                };
+        }
+        if (cache) {
+            windowStoreStoreBuilder.withCachingEnabled();
+        } else {
+            windowStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            windowStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            windowStoreStoreBuilder.withLoggingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                windowStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(windowStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()))
+                .process(processorSupplier, windowStoreStoreBuilder.name());
+        }
+
+    }
+
+    private void setUpSessionPAPITopology(final SessionBytesStoreSupplier 
supplier,
+                                          final StreamsBuilder builder) {
+        final StoreBuilder<?> sessionStoreStoreBuilder;
+        final ProcessorSupplier<Integer, Integer, Void, Void> 
processorSupplier;
+        sessionStoreStoreBuilder = Stores.sessionStoreBuilder(
+            supplier,
+            Serdes.Integer(),
+            Serdes.Integer()
+        );
+        processorSupplier = () -> new ContextualProcessor<Integer, Integer, 
Void, Void>() {
+            @Override
+            public void process(final Record<Integer, Integer> record) {
+                final SessionStore<Integer, Integer> stateStore =
+                    context().getStateStore(sessionStoreStoreBuilder.name());
+                stateStore.put(
+                    new Windowed<>(record.key(), new 
SessionWindow(WINDOW_START, WINDOW_START)),
+                    record.value()
+                );
+            }
+        };
+        if (cache) {
+            sessionStoreStoreBuilder.withCachingEnabled();
+        } else {
+            sessionStoreStoreBuilder.withCachingDisabled();
+        }
+        if (log) {
+            
sessionStoreStoreBuilder.withLoggingEnabled(Collections.emptyMap());
+        } else {
+            sessionStoreStoreBuilder.withLoggingDisabled();
+        }
+        if (storeToTest.global()) {
+            builder.addGlobalStore(
+                sessionStoreStoreBuilder,
+                INPUT_TOPIC_NAME,
+                Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                processorSupplier
+            );
+        } else {
+            builder.addStateStore(sessionStoreStoreBuilder);
+            builder
+                .stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), 
Serdes.Integer()))
+                .process(processorSupplier, sessionStoreStoreBuilder.name());
+        }
+
+    }
+
 
     @After
     public void afterTest() {
@@ -800,13 +913,13 @@ public class IQv2StoreIntegrationTest {
                     throw new AssertionError(queryResult.toString());
                 }
                 assertThat(partitionResult.getFailureReason(), 
is(FailureReason.UNKNOWN_QUERY_TYPE));
-                assertThat(partitionResult.getFailureMessage(), is(
+                assertThat(partitionResult.getFailureMessage(), matchesPattern(
                     "This store"
-                        + " (class 
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore)"
+                        + " \\(class 
org.apache.kafka.streams.state.internals.Metered.*WindowStore\\)"
                         + " doesn't know how to execute the given query"
-                        + " (WindowRangeQuery{key=Optional[2], 
timeFrom=Optional.empty, timeTo=Optional.empty})"
-                        + " because WindowStores only supports 
WindowRangeQuery.withWindowStartRange."
-                        + " Contact the store maintainer if you need support 
for a new query type."
+                        + " \\(WindowRangeQuery\\{key=Optional\\[2], 
timeFrom=Optional.empty, timeTo=Optional.empty}\\)"
+                        + " because WindowStores only supports 
WindowRangeQuery.withWindowStartRange\\."
+                        + " Contact the store maintainer if you need support 
for a new query type\\."
                 ));
             }
         }

Reply via email to