This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 39cacca89b0 KAFKA-15774: refactor windowed stores to use StoreFactory 
(#14708)
39cacca89b0 is described below

commit 39cacca89b0396398a783307e77df31b7ebf9e5e
Author: Almog Gavra <[email protected]>
AuthorDate: Fri Nov 10 18:19:11 2023 -0800

    KAFKA-15774: refactor windowed stores to use StoreFactory (#14708)
    
    This is a follow up from #14659 that ports the windowed classes to use the 
StoreFactory abstraction as well. There's a side benefit of not duplicating the 
materialization code twice for each StreamImpl/CogroupedStreamImpl class as 
well.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Matthias Sax 
<[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |   4 +-
 .../org/apache/kafka/streams/TopologyConfig.java   |   3 +-
 .../apache/kafka/streams/kstream/Materialized.java |  12 --
 .../internals/KeyValueStoreMaterializer.java       |  65 +---------
 .../kstream/internals/MaterializedInternal.java    |  12 ++
 .../internals/MaterializedStoreFactory.java        |  89 +++++++++++++
 .../internals/SessionStoreMaterializer.java        | 132 +++++++++++++++++++
 .../SessionWindowedCogroupedKStreamImpl.java       |  64 +---------
 .../internals/SessionWindowedKStreamImpl.java      |  74 +----------
 .../internals/SlidingWindowStoreMaterializer.java  | 140 +++++++++++++++++++++
 .../SlidingWindowedCogroupedKStreamImpl.java       |  68 +---------
 .../internals/SlidingWindowedKStreamImpl.java      |  81 +-----------
 .../TimeWindowedCogroupedKStreamImpl.java          |  69 +---------
 .../kstream/internals/TimeWindowedKStreamImpl.java |  76 +----------
 .../kstream/internals/WindowStoreMaterializer.java | 132 +++++++++++++++++++
 .../internals/InternalTopologyBuilder.java         |  47 +++----
 16 files changed, 553 insertions(+), 515 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 4d82703c5f0..3d37d13e9b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -535,7 +535,7 @@ public class StreamsConfig extends AbstractConfig {
 
     public static final String ROCKS_DB = "rocksDB";
     public static final String IN_MEMORY = "in_memory";
-    public static final String DEFAULT_DSL_STORE_DEFAULT = ROCKS_DB;
+    public static final String DEFAULT_DSL_STORE = ROCKS_DB;
 
     /** {@code default.windowed.key.serde.inner} */
     @SuppressWarnings("WeakerAccess")
@@ -1001,7 +1001,7 @@ public class StreamsConfig extends AbstractConfig {
                     CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
             .define(DEFAULT_DSL_STORE_CONFIG,
                     Type.STRING,
-                    DEFAULT_DSL_STORE_DEFAULT,
+                    DEFAULT_DSL_STORE,
                     in(ROCKS_DB, IN_MEMORY),
                     Importance.LOW,
                     DEFAULT_DSL_STORE_DOC)
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index 38991254e13..70f774ef860 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.internals.StreamsConfigUtils;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
@@ -218,7 +219,7 @@ public class TopologyConfig extends AbstractConfig {
     }
 
     public Materialized.StoreType parseStoreType() {
-        return Materialized.StoreType.parse(storeType);
+        return MaterializedInternal.parse(storeType);
     }
 
     public boolean isNamedTopology() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index b45bbff0e49..9dfb4189d6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -71,17 +70,6 @@ public class Materialized<K, V, S extends StateStore> {
     public enum StoreType {
         ROCKS_DB,
         IN_MEMORY;
-
-        public static StoreType parse(final String storeType) {
-            switch (storeType) {
-                case StreamsConfig.IN_MEMORY:
-                    return StoreType.IN_MEMORY;
-                case StreamsConfig.ROCKS_DB:
-                    return StoreType.ROCKS_DB;
-                default:
-                    throw new IllegalStateException("Unexpected storeType: " + 
storeType);
-            }
-        }
     }
 
     private Materialized(final StoreSupplier<S> storeSupplier) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 89814617ff9..3361d65c464 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,10 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -31,38 +28,17 @@ import 
org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Materializes a key-value store as either a {@link 
TimestampedKeyValueStoreBuilder} or a
  * {@link VersionedKeyValueStoreBuilder} depending on whether the store is 
versioned or not.
  */
-public class KeyValueStoreMaterializer<K, V> implements StoreFactory {
+public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V, KeyValueStore<Bytes, byte[]>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(KeyValueStoreMaterializer.class);
 
-    private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materialized;
-    private final Set<String> connectedProcessorNames = new HashSet<>();
-
-    private Materialized.StoreType defaultStoreType
-            = 
Materialized.StoreType.parse(StreamsConfig.DEFAULT_DSL_STORE_DEFAULT);
-
     public KeyValueStoreMaterializer(
             final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materialized
     ) {
-        this.materialized = materialized;
-
-        // this condition will never be false; in the next PR we will
-        // remove the initialization of storeType from MaterializedInternal
-        if (materialized.storeType() != null) {
-            defaultStoreType = materialized.storeType;
-        }
-    }
-
-    @Override
-    public void configure(final StreamsConfig config) {
-        // in a follow-up PR, this will set the defaultStoreType to the 
configured value
+        super(materialized);
     }
 
     @Override
@@ -127,21 +103,6 @@ public class KeyValueStoreMaterializer<K, V> implements 
StoreFactory {
         return ((VersionedBytesStoreSupplier) 
materialized.storeSupplier()).historyRetentionMs();
     }
 
-    @Override
-    public Set<String> connectedProcessorNames() {
-        return connectedProcessorNames;
-    }
-
-    @Override
-    public boolean loggingEnabled() {
-        return materialized.loggingEnabled();
-    }
-
-    @Override
-    public String name() {
-        return materialized.storeName();
-    }
-
     @Override
     public boolean isWindowStore() {
         return false;
@@ -152,26 +113,4 @@ public class KeyValueStoreMaterializer<K, V> implements 
StoreFactory {
         return materialized.storeSupplier() instanceof 
VersionedBytesStoreSupplier;
     }
 
-    @Override
-    public Map<String, String> logConfig() {
-        return materialized.logConfig();
-    }
-
-    @Override
-    public StoreFactory withCachingDisabled() {
-        materialized.withCachingDisabled();
-        return this;
-    }
-
-    @Override
-    public StoreFactory withLoggingDisabled() {
-        materialized.withLoggingDisabled();
-        return this;
-    }
-
-    @Override
-    public boolean isCompatibleWith(final StoreFactory storeFactory) {
-        return (storeFactory instanceof KeyValueStoreMaterializer)
-                && ((KeyValueStoreMaterializer<?, ?>) 
storeFactory).materialized.equals(materialized);
-    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 0f734143c92..9646f619b04 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.TopologyConfig;
@@ -59,6 +60,17 @@ public class MaterializedInternal<K, V, S extends 
StateStore> extends Materializ
         }
     }
 
+    public static StoreType parse(final String storeType) {
+        switch (storeType) {
+            case StreamsConfig.IN_MEMORY:
+                return StoreType.IN_MEMORY;
+            case StreamsConfig.ROCKS_DB:
+                return StoreType.ROCKS_DB;
+            default:
+                throw new IllegalStateException("Unexpected storeType: " + 
storeType);
+        }
+    }
+
     public String queryableStoreName() {
         return queryable ? storeName() : null;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
new file mode 100644
index 00000000000..390e4905f81
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+
+/**
+ * {@code MaterializedStoreFactory} is the base class for any {@link 
StoreFactory} that
+ * wraps a {@link MaterializedInternal} instance.
+ */
+public abstract class MaterializedStoreFactory<K, V, S extends StateStore> 
implements StoreFactory {
+    protected final MaterializedInternal<K, V, S> materialized;
+    private final Set<String> connectedProcessorNames = new HashSet<>();
+    protected Materialized.StoreType defaultStoreType
+            = MaterializedInternal.parse(StreamsConfig.DEFAULT_DSL_STORE);
+
+    public MaterializedStoreFactory(final MaterializedInternal<K, V, S> 
materialized) {
+        this.materialized = materialized;
+
+        // this condition will never be false; in the next PR we will
+        // remove the initialization of storeType from MaterializedInternal
+        if (materialized.storeType() != null) {
+            defaultStoreType = materialized.storeType;
+        }
+    }
+
+    @Override
+    public void configure(final StreamsConfig config) {
+        // in a follow-up PR, this will set the defaultStoreType to the 
configured value
+    }
+
+    @Override
+    public Set<String> connectedProcessorNames() {
+        return connectedProcessorNames;
+    }
+
+    @Override
+    public boolean loggingEnabled() {
+        return materialized.loggingEnabled();
+    }
+
+    @Override
+    public String name() {
+        return materialized.storeName();
+    }
+
+    @Override
+    public Map<String, String> logConfig() {
+        return materialized.logConfig();
+    }
+
+    @Override
+    public StoreFactory withCachingDisabled() {
+        materialized.withCachingDisabled();
+        return this;
+    }
+
+    @Override
+    public StoreFactory withLoggingDisabled() {
+        materialized.withLoggingDisabled();
+        return this;
+    }
+
+    @Override
+    public boolean isCompatibleWith(final StoreFactory storeFactory) {
+        return (storeFactory instanceof MaterializedStoreFactory)
+                && ((MaterializedStoreFactory<?, ?, ?>) 
storeFactory).materialized.equals(materialized);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
new file mode 100644
index 00000000000..c3701c5fd80
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+
+public class SessionStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V, SessionStore<Bytes, byte[]>> {
+
+    private final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> 
materialized;
+    private final SessionWindows sessionWindows;
+    private final EmitStrategy emitStrategy;
+    private final long retentionPeriod;
+
+    public SessionStoreMaterializer(
+            final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> 
materialized,
+            final SessionWindows sessionWindows,
+            final EmitStrategy emitStrategy
+    ) {
+        super(materialized);
+        this.materialized = materialized;
+        this.sessionWindows = sessionWindows;
+        this.emitStrategy = emitStrategy;
+
+        retentionPeriod = retentionPeriod();
+        if ((sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs()) 
> retentionPeriod) {
+            throw new IllegalArgumentException("The retention period of the 
session store "
+                    + materialized.storeName()
+                    + " must be no smaller than the session inactivity gap 
plus the"
+                    + " grace period."
+                    + " Got gap=[" + sessionWindows.inactivityGap() + "],"
+                    + " grace=[" + sessionWindows.gracePeriodMs() + "],"
+                    + " retention=[" + retentionPeriod + "]");
+        }
+    }
+
+    @Override
+    public StateStore build() {
+        SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) 
materialized.storeSupplier();
+        if (supplier == null) {
+
+            switch (defaultStoreType) {
+                case IN_MEMORY:
+                    supplier = Stores.inMemorySessionStore(
+                            materialized.storeName(),
+                            Duration.ofMillis(retentionPeriod)
+                    );
+                    break;
+                case ROCKS_DB:
+                    supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+                            new RocksDbTimeOrderedSessionBytesStoreSupplier(
+                                    materialized.storeName(),
+                                    retentionPeriod,
+                                    true) :
+                            Stores.persistentSessionStore(
+                                    materialized.storeName(),
+                                    Duration.ofMillis(retentionPeriod)
+                            );
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
+            }
+        }
+
+        final StoreBuilder<SessionStore<K, V>> builder = 
Stores.sessionStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
+        );
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        // do not enable cache if the emit final strategy is used
+        if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+            builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public long retentionPeriod() {
+        return materialized.retention() != null
+                ? materialized.retention().toMillis()
+                : sessionWindows.inactivityGap() + 
sessionWindows.gracePeriodMs();
+    }
+
+    @Override
+    public long historyRetention() {
+        throw new IllegalStateException(
+                "historyRetention is not supported when not a versioned 
store");
+    }
+
+    @Override
+    public boolean isWindowStore() {
+        return true;
+    }
+
+    @Override
+    public boolean isVersionedStore() {
+        return false;
+    }
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
index bbe7ce54fb6..fc230832984 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -29,14 +30,8 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -97,7 +92,10 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> 
extends
             groupPatterns,
             initializer,
             new NamedInternal(named),
-            materialize(materializedInternal),
+            new SessionStoreMaterializer<>(
+                    materializedInternal,
+                    sessionWindows,
+                    EmitStrategy.onWindowUpdate()),
             materializedInternal.keySerde() != null ?
                 new WindowedSerdes.SessionWindowedSerde<>(
                     materializedInternal.keySerde()) :
@@ -108,56 +106,4 @@ public class SessionWindowedCogroupedKStreamImpl<K, V> 
extends
             sessionMerger);
     }
 
-    private StoreFactory materialize(final MaterializedInternal<K, V, 
SessionStore<Bytes, byte[]>> materialized) {
-        SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) 
materialized.storeSupplier();
-        if (supplier == null) {
-            final long retentionPeriod = materialized.retention() != null ?
-                materialized.retention().toMillis() : 
sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs();
-
-            if ((sessionWindows.inactivityGap() + 
sessionWindows.gracePeriodMs()) > retentionPeriod) {
-                throw new IllegalArgumentException("The retention period of 
the session store "
-                    + materialized.storeName()
-                    + " must be no smaller than the session inactivity gap 
plus the"
-                    + " grace period."
-                    + " Got gap=[" + sessionWindows.inactivityGap() + "],"
-                    + " grace=[" + sessionWindows.gracePeriodMs() + "],"
-                    + " retention=[" + retentionPeriod + "]");
-            }
-
-            switch (materialized.storeType()) {
-                case IN_MEMORY:
-                    supplier = Stores.inMemorySessionStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod)
-                    );
-                    break;
-                case ROCKS_DB:
-                    supplier = Stores.persistentSessionStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod)
-                    );
-                    break;
-                default:
-                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
-            }
-        }
-
-        final StoreBuilder<SessionStore<K, V>> builder = 
Stores.sessionStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde()
-        );
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-
-        if (materialized.cachingEnabled()) {
-            builder.withCachingEnabled();
-        }
-        return new StoreBuilderWrapper(builder);
-    }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 643bc6b2593..d8f3770b79a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -32,15 +32,8 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.WindowedSerdes;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
 import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
 
-import java.time.Duration;
 import java.util.Objects;
 import java.util.Set;
 
@@ -117,7 +110,7 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         final String aggregateName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            materialize(materializedInternal),
+            new SessionStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -167,7 +160,7 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         final String reduceName = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
         return aggregateBuilder.build(
             new NamedInternal(reduceName),
-            materialize(materializedInternal),
+            new SessionStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -226,7 +219,7 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
 
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            materialize(materializedInternal),
+            new SessionStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
             new KStreamSessionWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -240,67 +233,6 @@ public class SessionWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
             false);
     }
 
-    private <VR> StoreFactory materialize(final MaterializedInternal<K, VR, 
SessionStore<Bytes, byte[]>> materialized) {
-        SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) 
materialized.storeSupplier();
-        if (supplier == null) {
-            final long retentionPeriod = materialized.retention() != null ?
-                materialized.retention().toMillis() : windows.inactivityGap() 
+ windows.gracePeriodMs();
-
-            if ((windows.inactivityGap() + windows.gracePeriodMs()) > 
retentionPeriod) {
-                throw new IllegalArgumentException("The retention period of 
the session store "
-                                                       + 
materialized.storeName()
-                                                       + " must be no smaller 
than the session inactivity gap plus the"
-                                                       + " grace period."
-                                                       + " Got gap=[" + 
windows.inactivityGap() + "],"
-                                                       + " grace=[" + 
windows.gracePeriodMs() + "],"
-                                                       + " retention=[" + 
retentionPeriod + "]");
-            }
-
-            switch (materialized.storeType()) {
-                case IN_MEMORY:
-                    supplier = Stores.inMemorySessionStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod)
-                    );
-                    break;
-                case ROCKS_DB:
-                    supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
-                        new RocksDbTimeOrderedSessionBytesStoreSupplier(
-                            materialized.storeName(),
-                            retentionPeriod,
-                            true) :
-                        Stores.persistentSessionStore(
-                            materialized.storeName(),
-                            Duration.ofMillis(retentionPeriod)
-                        );
-                    break;
-                default:
-                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
-            }
-        }
-
-        final StoreBuilder<SessionStore<K, VR>> builder = 
Stores.sessionStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde()
-        );
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-
-        // do not enable cache if the emit final strategy is used
-        if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
-            builder.withCachingEnabled();
-        } else {
-            builder.withCachingDisabled();
-        }
-
-        return new StoreBuilderWrapper(builder);
-    }
-
     private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V> 
aggregator) {
         return (aggKey, aggOne, aggTwo) -> aggregator.apply(aggKey, aggTwo, 
aggOne);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
new file mode 100644
index 00000000000..702abfed6e8
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+
+public class SlidingWindowStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K, V, WindowStore<Bytes, byte[]>> {
+
+    private final SlidingWindows windows;
+    private final EmitStrategy emitStrategy;
+    private final long retentionPeriod;
+
+    public SlidingWindowStoreMaterializer(
+            final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materialized,
+            final SlidingWindows windows,
+            final EmitStrategy emitStrategy
+    ) {
+        super(materialized);
+        this.windows = windows;
+        this.emitStrategy = emitStrategy;
+
+        retentionPeriod = retentionPeriod();
+        // large retention time to ensure that all existing windows needed to 
create new sliding windows can be accessed
+        // earliest window start time we could need to create corresponding 
right window would be recordTime - 2 * timeDifference
+        if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) > 
retentionPeriod) {
+            throw new IllegalArgumentException("The retention period of the 
window store "
+                    + materialized.storeName()
+                    + " must be no smaller than 2 * time difference plus the 
grace period."
+                    + " Got time difference=[" + windows.timeDifferenceMs() + 
"],"
+                    + " grace=[" + windows.gracePeriodMs() + "],"
+                    + " retention=[" + retentionPeriod + "]");
+        }
+    }
+
+    @Override
+    public StateStore build() {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
+        if (supplier == null) {
+
+            switch (defaultStoreType) {
+                case IN_MEMORY:
+                    supplier = Stores.inMemoryWindowStore(
+                            materialized.storeName(),
+                            Duration.ofMillis(retentionPeriod),
+                            Duration.ofMillis(windows.timeDifferenceMs()),
+                            false
+                    );
+                    break;
+                case ROCKS_DB:
+                    supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+                            
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+                                    materialized.storeName(),
+                                    Duration.ofMillis(retentionPeriod),
+                                    
Duration.ofMillis(windows.timeDifferenceMs()),
+                                    false,
+                                    true
+                            ) :
+                            Stores.persistentTimestampedWindowStore(
+                                    materialized.storeName(),
+                                    Duration.ofMillis(retentionPeriod),
+                                    
Duration.ofMillis(windows.timeDifferenceMs()),
+                                    false
+                            );
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
+            }
+        }
+
+        final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
+                .timestampedWindowStoreBuilder(
+                        supplier,
+                        materialized.keySerde(),
+                        materialized.valueSerde()
+                );
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        // do not enable cache if the emit final strategy is used
+        if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+            builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public long retentionPeriod() {
+        return  materialized.retention() != null
+                ? materialized.retention().toMillis()
+                : windows.gracePeriodMs() + 2 * windows.timeDifferenceMs();
+    }
+
+    @Override
+    public long historyRetention() {
+        throw new IllegalStateException(
+                "historyRetention is not supported when not a versioned 
store");
+    }
+
+    @Override
+    public boolean isWindowStore() {
+        return true;
+    }
+
+    @Override
+    public boolean isVersionedStore() {
+        return false;
+    }
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
index de24fdd838f..e65dc1d07dc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -27,15 +28,8 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -91,7 +85,7 @@ public class SlidingWindowedCogroupedKStreamImpl<K, V> 
extends AbstractStream<K,
             groupPatterns,
             initializer,
             new NamedInternal(named),
-            materialize(materializedInternal),
+            new SlidingWindowStoreMaterializer<>(materializedInternal, 
windows, EmitStrategy.onWindowUpdate()),
             materializedInternal.keySerde() != null ?
                 new FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs())
                 : null,
@@ -100,62 +94,4 @@ public class SlidingWindowedCogroupedKStreamImpl<K, V> 
extends AbstractStream<K,
             windows);
     }
 
-    private StoreFactory materialize(final MaterializedInternal<K, V, 
WindowStore<Bytes, byte[]>> materialized) {
-        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
-        if (supplier == null) {
-            final long retentionPeriod = materialized.retention() != null ? 
materialized.retention().toMillis() : windows.gracePeriodMs() + 2 * 
windows.timeDifferenceMs();
-
-            if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) > 
retentionPeriod) {
-                throw new IllegalArgumentException("The retention period of 
the window store "
-                    + name
-                    + " must be no smaller than 2 * time difference plus the 
grace period."
-                    + " Got time difference=[" + windows.timeDifferenceMs() + 
"],"
-                    + " grace=[" + windows.gracePeriodMs()
-                    + "],"
-                    + " retention=[" + retentionPeriod
-                    + "]");
-            }
-
-            switch (materialized.storeType()) {
-                case IN_MEMORY:
-                    supplier = Stores.inMemoryWindowStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod),
-                        Duration.ofMillis(windows.timeDifferenceMs()),
-                        false
-                    );
-                    break;
-                case ROCKS_DB:
-                    supplier = Stores.persistentTimestampedWindowStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod),
-                        Duration.ofMillis(windows.timeDifferenceMs()),
-                        false
-                    );
-                    break;
-                default:
-                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
-            }
-        }
-
-        final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
-            .timestampedWindowStoreBuilder(
-                supplier,
-                materialized.keySerde(),
-                materialized.valueSerde()
-            );
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-        if (materialized.cachingEnabled()) {
-            builder.withCachingEnabled();
-        } else {
-            builder.withCachingDisabled();
-        }
-        return new StoreBuilderWrapper(builder);
-    }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 4f29bd3093b..3cb7b3f29bd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.EmitStrategy;
-import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -31,18 +30,10 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
-import java.time.Duration;
 import java.util.Objects;
 import java.util.Set;
-import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
 
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
@@ -102,7 +93,7 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
 
         return aggregateBuilder.build(
                 new NamedInternal(aggregateName),
-                materialize(materializedInternal),
+                new SlidingWindowStoreMaterializer<>(materializedInternal, 
windows, emitStrategy),
                 new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
@@ -147,7 +138,7 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
 
         return aggregateBuilder.build(
                 new NamedInternal(aggregateName),
-                materialize(materializedInternal),
+                new SlidingWindowStoreMaterializer<>(materializedInternal, 
windows, emitStrategy),
                 new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, initializer, aggregator),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
@@ -193,7 +184,7 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
 
         return aggregateBuilder.build(
                 new NamedInternal(reduceName),
-                materialize(materializedInternal),
+                new SlidingWindowStoreMaterializer<>(materializedInternal, 
windows, emitStrategy),
                 new KStreamSlidingWindowAggregate<>(windows, 
materializedInternal.storeName(), emitStrategy, 
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
                 materializedInternal.queryableStoreName(),
                 materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.timeDifferenceMs()) : null,
@@ -207,72 +198,6 @@ public class SlidingWindowedKStreamImpl<K, V> extends 
AbstractStream<K, V> imple
         return this;
     }
 
-    private <VR> StoreFactory materialize(final MaterializedInternal<K, VR, 
WindowStore<Bytes, byte[]>> materialized) {
-        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
-        if (supplier == null) {
-            final long retentionPeriod = materialized.retention() != null ? 
materialized.retention().toMillis() : windows.gracePeriodMs() + 2 * 
windows.timeDifferenceMs();
-
-            // large retention time to ensure that all existing windows needed 
to create new sliding windows can be accessed
-            // earliest window start time we could need to create 
corresponding right window would be recordTime - 2 * timeDifference
-            if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) > 
retentionPeriod) {
-                throw new IllegalArgumentException("The retention period of 
the window store "
-                        + name + " must be no smaller than 2 * time difference 
plus the grace period."
-                        + " Got time difference=[" + 
windows.timeDifferenceMs() + "],"
-                        + " grace=[" + windows.gracePeriodMs() + "],"
-                        + " retention=[" + retentionPeriod + "]");
-            }
-
-            switch (materialized.storeType()) {
-                case IN_MEMORY:
-                    supplier = Stores.inMemoryWindowStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod),
-                        Duration.ofMillis(windows.timeDifferenceMs()),
-                        false
-                    );
-                    break;
-                case ROCKS_DB:
-                    supplier = emitStrategy.type() == 
StrategyType.ON_WINDOW_CLOSE ?
-                        
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
-                            materialized.storeName(),
-                            Duration.ofMillis(retentionPeriod),
-                            Duration.ofMillis(windows.timeDifferenceMs()),
-                            false,
-                            true
-                        ) :
-                        Stores.persistentTimestampedWindowStore(
-                            materialized.storeName(),
-                            Duration.ofMillis(retentionPeriod),
-                            Duration.ofMillis(windows.timeDifferenceMs()),
-                            false
-                        );
-                    break;
-                default:
-                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
-            }
-        }
-
-        final StoreBuilder<TimestampedWindowStore<K, VR>> builder = 
Stores.timestampedWindowStoreBuilder(
-                supplier,
-                materialized.keySerde(),
-                materialized.valueSerde()
-        );
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-
-        // do not enable cache if the emit final strategy is used
-        if (materialized.cachingEnabled() && emitStrategy.type() != 
StrategyType.ON_WINDOW_CLOSE) {
-            builder.withCachingEnabled();
-        } else {
-            builder.withCachingDisabled();
-        }
-        return new StoreBuilderWrapper(builder);
-    }
-
     private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) 
{
         return (aggKey, value, aggregate) -> aggregate == null ? value : 
reducer.apply(aggregate, value);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
index 4236af7af53..4bda23c2aa9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -28,15 +29,8 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
-import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -95,7 +89,7 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends 
Window> extends Ab
             groupPatterns,
             initializer,
             new NamedInternal(named),
-            materialize(materializedInternal),
+            new WindowStoreMaterializer<>(materializedInternal, windows, 
EmitStrategy.onWindowUpdate()),
             materializedInternal.keySerde() != null ?
                 new FullTimeWindowedSerde<>(materializedInternal.keySerde(), 
windows.size())
                 : null,
@@ -103,63 +97,4 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W 
extends Window> extends Ab
             materializedInternal.queryableStoreName(),
             windows);
     }
-
-    private StoreFactory materialize(
-        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materialized) {
-        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
-        if (supplier == null) {
-            final long retentionPeriod = materialized.retention() != null ?
-                materialized.retention().toMillis() : windows.size() + 
windows.gracePeriodMs();
-
-            if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
-                throw new IllegalArgumentException("The retention period of 
the window store "
-                        + name
-                        + " must be no smaller than its window size plus the 
grace period."
-                        + " Got size=[" + windows.size() + "],"
-                        + " grace=[" + windows.gracePeriodMs()
-                        + "],"
-                        + " retention=[" + retentionPeriod
-                        + "]");
-            }
-
-            switch (materialized.storeType()) {
-                case IN_MEMORY:
-                    supplier = Stores.inMemoryWindowStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod),
-                        Duration.ofMillis(windows.size()),
-                        false
-                    );
-                    break;
-                case ROCKS_DB:
-                    supplier = Stores.persistentTimestampedWindowStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod),
-                        Duration.ofMillis(windows.size()),
-                        false
-                    );
-                    break;
-                default:
-                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
-            }
-        }
-
-        final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
-            .timestampedWindowStoreBuilder(
-                supplier,
-                materialized.keySerde(),
-                materialized.valueSerde()
-            );
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-
-        if (materialized.cachingEnabled()) {
-            builder.withCachingEnabled();
-        }
-        return new StoreBuilderWrapper(builder);
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 0c6b79ce4f6..b615e20714b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -33,18 +33,10 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 
-import java.time.Duration;
 import java.util.Objects;
 import java.util.Set;
-import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
 
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
 import static 
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
@@ -113,7 +105,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
 
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            materialize(materializedInternal),
+            new WindowStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
             new KStreamWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -165,7 +157,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
 
         return aggregateBuilder.build(
             new NamedInternal(aggregateName),
-            materialize(materializedInternal),
+            new WindowStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
             new KStreamWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -216,7 +208,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
 
         return aggregateBuilder.build(
             new NamedInternal(reduceName),
-            materialize(materializedInternal),
+            new WindowStoreMaterializer<>(materializedInternal, windows, 
emitStrategy),
             new KStreamWindowAggregate<>(
                 windows,
                 materializedInternal.storeName(),
@@ -239,68 +231,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends 
Window> extends AbstractStr
         return this;
     }
 
-    private <VR> StoreFactory materialize(final MaterializedInternal<K, VR, 
WindowStore<Bytes, byte[]>> materialized) {
-        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
-        if (supplier == null) {
-            final long retentionPeriod = materialized.retention() != null ?
-                materialized.retention().toMillis() : windows.size() + 
windows.gracePeriodMs();
-
-            if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
-                throw new IllegalArgumentException("The retention period of 
the window store "
-                        + name + " must be no smaller than its window size 
plus the grace period."
-                        + " Got size=[" + windows.size() + "],"
-                        + " grace=[" + windows.gracePeriodMs() + "],"
-                        + " retention=[" + retentionPeriod + "]");
-            }
-
-            switch (materialized.storeType()) {
-                case IN_MEMORY:
-                    supplier = Stores.inMemoryWindowStore(
-                        materialized.storeName(),
-                        Duration.ofMillis(retentionPeriod),
-                        Duration.ofMillis(windows.size()),
-                        false
-                    );
-                    break;
-                case ROCKS_DB:
-                    supplier = emitStrategy.type() == 
StrategyType.ON_WINDOW_CLOSE ?
-                        
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
-                            materialized.storeName(),
-                            Duration.ofMillis(retentionPeriod),
-                            Duration.ofMillis(windows.size()),
-                            false,
-                            false
-                        ) :
-                        Stores.persistentTimestampedWindowStore(
-                            materialized.storeName(),
-                            Duration.ofMillis(retentionPeriod),
-                            Duration.ofMillis(windows.size()),
-                            false
-                    );
-                    break;
-                default:
-                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
-            }
-        }
-
-        final StoreBuilder<TimestampedWindowStore<K, VR>> builder = 
Stores.timestampedWindowStoreBuilder(
-            supplier,
-            materialized.keySerde(),
-            materialized.valueSerde()
-        );
-
-        if (materialized.loggingEnabled()) {
-            builder.withLoggingEnabled(materialized.logConfig());
-        } else {
-            builder.withLoggingDisabled();
-        }
-
-        if (materialized.cachingEnabled()) {
-            builder.withCachingEnabled();
-        }
-        return new StoreBuilderWrapper(builder);
-    }
-
     private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) 
{
         return (aggKey, value, aggregate) -> aggregate == null ? value : 
reducer.apply(aggregate, value);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
new file mode 100644
index 00000000000..349cca7985d
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+
+public class WindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K, 
V, WindowStore<Bytes, byte[]>> {
+
+    private final Windows<?> windows;
+    private final EmitStrategy emitStrategy;
+    private final long retentionPeriod;
+
+    public WindowStoreMaterializer(
+            final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materialized,
+            final Windows<?> windows,
+            final EmitStrategy emitStrategy
+    ) {
+        super(materialized);
+        this.windows = windows;
+        this.emitStrategy = emitStrategy;
+
+        retentionPeriod = retentionPeriod();
+
+        if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
+            throw new IllegalArgumentException("The retention period of the 
window store "
+                    + materialized.storeName() + " must be no smaller than its 
window size plus the grace period."
+                    + " Got size=[" + windows.size() + "],"
+                    + " grace=[" + windows.gracePeriodMs() + "],"
+                    + " retention=[" + retentionPeriod + "]");
+        }
+    }
+
+    @Override
+    public StateStore build() {
+        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier) 
materialized.storeSupplier();
+        if (supplier == null) {
+            switch (defaultStoreType) {
+                case IN_MEMORY:
+                    supplier = Stores.inMemoryWindowStore(
+                            materialized.storeName(),
+                            Duration.ofMillis(retentionPeriod),
+                            Duration.ofMillis(windows.size()),
+                            false
+                    );
+                    break;
+                case ROCKS_DB:
+                    supplier = emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+                            
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+                                    materialized.storeName(),
+                                    Duration.ofMillis(retentionPeriod),
+                                    Duration.ofMillis(windows.size()),
+                                    false,
+                                    false
+                            ) :
+                            Stores.persistentTimestampedWindowStore(
+                                    materialized.storeName(),
+                                    Duration.ofMillis(retentionPeriod),
+                                    Duration.ofMillis(windows.size()),
+                                    false
+                            );
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
+            }
+        }
+
+        final StoreBuilder<TimestampedWindowStore<K, V>> builder = 
Stores.timestampedWindowStoreBuilder(
+                supplier,
+                materialized.keySerde(),
+                materialized.valueSerde()
+        );
+
+        if (materialized.loggingEnabled()) {
+            builder.withLoggingEnabled(materialized.logConfig());
+        } else {
+            builder.withLoggingDisabled();
+        }
+
+        if (materialized.cachingEnabled()) {
+            builder.withCachingEnabled();
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public long retentionPeriod() {
+        return materialized.retention() != null
+                ? materialized.retention().toMillis()
+                : windows.size() + windows.gracePeriodMs();
+    }
+
+    @Override
+    public long historyRetention() {
+        throw new IllegalStateException(
+                "historyRetention is not supported when not a versioned 
store");
+    }
+
+    @Override
+    public boolean isWindowStore() {
+        return true;
+    }
+
+    @Override
+    public boolean isVersionedStore() {
+        return false;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9f7bc0206c6..87dc9301542 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -16,29 +16,6 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
-import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
-import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -60,6 +37,30 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
+import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
+import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
+
 public class InternalTopologyBuilder {
 
     public InternalTopologyBuilder() {

Reply via email to