This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 39cacca89b0 KAFKA-15774: refactor windowed stores to use StoreFactory
(#14708)
39cacca89b0 is described below
commit 39cacca89b0396398a783307e77df31b7ebf9e5e
Author: Almog Gavra <[email protected]>
AuthorDate: Fri Nov 10 18:19:11 2023 -0800
KAFKA-15774: refactor windowed stores to use StoreFactory (#14708)
This is a follow up from #14659 that ports the windowed classes to use the
StoreFactory abstraction as well. There's a side benefit of not duplicating the
materialization code twice for each StreamImpl/CogroupedStreamImpl class as
well.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Matthias Sax
<[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 4 +-
.../org/apache/kafka/streams/TopologyConfig.java | 3 +-
.../apache/kafka/streams/kstream/Materialized.java | 12 --
.../internals/KeyValueStoreMaterializer.java | 65 +---------
.../kstream/internals/MaterializedInternal.java | 12 ++
.../internals/MaterializedStoreFactory.java | 89 +++++++++++++
.../internals/SessionStoreMaterializer.java | 132 +++++++++++++++++++
.../SessionWindowedCogroupedKStreamImpl.java | 64 +---------
.../internals/SessionWindowedKStreamImpl.java | 74 +----------
.../internals/SlidingWindowStoreMaterializer.java | 140 +++++++++++++++++++++
.../SlidingWindowedCogroupedKStreamImpl.java | 68 +---------
.../internals/SlidingWindowedKStreamImpl.java | 81 +-----------
.../TimeWindowedCogroupedKStreamImpl.java | 69 +---------
.../kstream/internals/TimeWindowedKStreamImpl.java | 76 +----------
.../kstream/internals/WindowStoreMaterializer.java | 132 +++++++++++++++++++
.../internals/InternalTopologyBuilder.java | 47 +++----
16 files changed, 553 insertions(+), 515 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 4d82703c5f0..3d37d13e9b6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -535,7 +535,7 @@ public class StreamsConfig extends AbstractConfig {
public static final String ROCKS_DB = "rocksDB";
public static final String IN_MEMORY = "in_memory";
- public static final String DEFAULT_DSL_STORE_DEFAULT = ROCKS_DB;
+ public static final String DEFAULT_DSL_STORE = ROCKS_DB;
/** {@code default.windowed.key.serde.inner} */
@SuppressWarnings("WeakerAccess")
@@ -1001,7 +1001,7 @@ public class StreamsConfig extends AbstractConfig {
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(DEFAULT_DSL_STORE_CONFIG,
Type.STRING,
- DEFAULT_DSL_STORE_DEFAULT,
+ DEFAULT_DSL_STORE,
in(ROCKS_DB, IN_MEMORY),
Importance.LOW,
DEFAULT_DSL_STORE_DOC)
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index 38991254e13..70f774ef860 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
+import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;
import
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
@@ -218,7 +219,7 @@ public class TopologyConfig extends AbstractConfig {
}
public Materialized.StoreType parseStoreType() {
- return Materialized.StoreType.parse(storeType);
+ return MaterializedInternal.parse(storeType);
}
public boolean isNamedTopology() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index b45bbff0e49..9dfb4189d6c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -71,17 +70,6 @@ public class Materialized<K, V, S extends StateStore> {
public enum StoreType {
ROCKS_DB,
IN_MEMORY;
-
- public static StoreType parse(final String storeType) {
- switch (storeType) {
- case StreamsConfig.IN_MEMORY:
- return StoreType.IN_MEMORY;
- case StreamsConfig.ROCKS_DB:
- return StoreType.ROCKS_DB;
- default:
- throw new IllegalStateException("Unexpected storeType: " +
storeType);
- }
- }
}
private Materialized(final StoreSupplier<S> storeSupplier) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 89814617ff9..3361d65c464 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,10 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -31,38 +28,17 @@ import
org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
/**
* Materializes a key-value store as either a {@link
TimestampedKeyValueStoreBuilder} or a
* {@link VersionedKeyValueStoreBuilder} depending on whether the store is
versioned or not.
*/
-public class KeyValueStoreMaterializer<K, V> implements StoreFactory {
+public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V, KeyValueStore<Bytes, byte[]>> {
private static final Logger LOG =
LoggerFactory.getLogger(KeyValueStoreMaterializer.class);
- private final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materialized;
- private final Set<String> connectedProcessorNames = new HashSet<>();
-
- private Materialized.StoreType defaultStoreType
- =
Materialized.StoreType.parse(StreamsConfig.DEFAULT_DSL_STORE_DEFAULT);
-
public KeyValueStoreMaterializer(
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materialized
) {
- this.materialized = materialized;
-
- // this condition will never be false; in the next PR we will
- // remove the initialization of storeType from MaterializedInternal
- if (materialized.storeType() != null) {
- defaultStoreType = materialized.storeType;
- }
- }
-
- @Override
- public void configure(final StreamsConfig config) {
- // in a follow-up PR, this will set the defaultStoreType to the
configured value
+ super(materialized);
}
@Override
@@ -127,21 +103,6 @@ public class KeyValueStoreMaterializer<K, V> implements
StoreFactory {
return ((VersionedBytesStoreSupplier)
materialized.storeSupplier()).historyRetentionMs();
}
- @Override
- public Set<String> connectedProcessorNames() {
- return connectedProcessorNames;
- }
-
- @Override
- public boolean loggingEnabled() {
- return materialized.loggingEnabled();
- }
-
- @Override
- public String name() {
- return materialized.storeName();
- }
-
@Override
public boolean isWindowStore() {
return false;
@@ -152,26 +113,4 @@ public class KeyValueStoreMaterializer<K, V> implements
StoreFactory {
return materialized.storeSupplier() instanceof
VersionedBytesStoreSupplier;
}
- @Override
- public Map<String, String> logConfig() {
- return materialized.logConfig();
- }
-
- @Override
- public StoreFactory withCachingDisabled() {
- materialized.withCachingDisabled();
- return this;
- }
-
- @Override
- public StoreFactory withLoggingDisabled() {
- materialized.withLoggingDisabled();
- return this;
- }
-
- @Override
- public boolean isCompatibleWith(final StoreFactory storeFactory) {
- return (storeFactory instanceof KeyValueStoreMaterializer)
- && ((KeyValueStoreMaterializer<?, ?>)
storeFactory).materialized.equals(materialized);
- }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index 0f734143c92..9646f619b04 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.TopologyConfig;
@@ -59,6 +60,17 @@ public class MaterializedInternal<K, V, S extends
StateStore> extends Materializ
}
}
+ public static StoreType parse(final String storeType) {
+ switch (storeType) {
+ case StreamsConfig.IN_MEMORY:
+ return StoreType.IN_MEMORY;
+ case StreamsConfig.ROCKS_DB:
+ return StoreType.ROCKS_DB;
+ default:
+ throw new IllegalStateException("Unexpected storeType: " +
storeType);
+ }
+ }
+
public String queryableStoreName() {
return queryable ? storeName() : null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
new file mode 100644
index 00000000000..390e4905f81
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedStoreFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+
+/**
+ * {@code MaterializedStoreFactory} is the base class for any {@link
StoreFactory} that
+ * wraps a {@link MaterializedInternal} instance.
+ */
+public abstract class MaterializedStoreFactory<K, V, S extends StateStore>
implements StoreFactory {
+ protected final MaterializedInternal<K, V, S> materialized;
+ private final Set<String> connectedProcessorNames = new HashSet<>();
+ protected Materialized.StoreType defaultStoreType
+ = MaterializedInternal.parse(StreamsConfig.DEFAULT_DSL_STORE);
+
+ public MaterializedStoreFactory(final MaterializedInternal<K, V, S>
materialized) {
+ this.materialized = materialized;
+
+ // this condition will never be false; in the next PR we will
+ // remove the initialization of storeType from MaterializedInternal
+ if (materialized.storeType() != null) {
+ defaultStoreType = materialized.storeType;
+ }
+ }
+
+ @Override
+ public void configure(final StreamsConfig config) {
+ // in a follow-up PR, this will set the defaultStoreType to the
configured value
+ }
+
+ @Override
+ public Set<String> connectedProcessorNames() {
+ return connectedProcessorNames;
+ }
+
+ @Override
+ public boolean loggingEnabled() {
+ return materialized.loggingEnabled();
+ }
+
+ @Override
+ public String name() {
+ return materialized.storeName();
+ }
+
+ @Override
+ public Map<String, String> logConfig() {
+ return materialized.logConfig();
+ }
+
+ @Override
+ public StoreFactory withCachingDisabled() {
+ materialized.withCachingDisabled();
+ return this;
+ }
+
+ @Override
+ public StoreFactory withLoggingDisabled() {
+ materialized.withLoggingDisabled();
+ return this;
+ }
+
+ @Override
+ public boolean isCompatibleWith(final StoreFactory storeFactory) {
+ return (storeFactory instanceof MaterializedStoreFactory)
+ && ((MaterializedStoreFactory<?, ?, ?>)
storeFactory).materialized.equals(materialized);
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
new file mode 100644
index 00000000000..c3701c5fd80
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+
+public class SessionStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V, SessionStore<Bytes, byte[]>> {
+
+ private final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>>
materialized;
+ private final SessionWindows sessionWindows;
+ private final EmitStrategy emitStrategy;
+ private final long retentionPeriod;
+
+ public SessionStoreMaterializer(
+ final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>>
materialized,
+ final SessionWindows sessionWindows,
+ final EmitStrategy emitStrategy
+ ) {
+ super(materialized);
+ this.materialized = materialized;
+ this.sessionWindows = sessionWindows;
+ this.emitStrategy = emitStrategy;
+
+ retentionPeriod = retentionPeriod();
+ if ((sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs())
> retentionPeriod) {
+ throw new IllegalArgumentException("The retention period of the
session store "
+ + materialized.storeName()
+ + " must be no smaller than the session inactivity gap
plus the"
+ + " grace period."
+ + " Got gap=[" + sessionWindows.inactivityGap() + "],"
+ + " grace=[" + sessionWindows.gracePeriodMs() + "],"
+ + " retention=[" + retentionPeriod + "]");
+ }
+ }
+
+ @Override
+ public StateStore build() {
+ SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier)
materialized.storeSupplier();
+ if (supplier == null) {
+
+ switch (defaultStoreType) {
+ case IN_MEMORY:
+ supplier = Stores.inMemorySessionStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod)
+ );
+ break;
+ case ROCKS_DB:
+ supplier = emitStrategy.type() ==
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+ new RocksDbTimeOrderedSessionBytesStoreSupplier(
+ materialized.storeName(),
+ retentionPeriod,
+ true) :
+ Stores.persistentSessionStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod)
+ );
+ break;
+ default:
+ throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
+ }
+ }
+
+ final StoreBuilder<SessionStore<K, V>> builder =
Stores.sessionStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
+ );
+
+ if (materialized.loggingEnabled()) {
+ builder.withLoggingEnabled(materialized.logConfig());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ // do not enable cache if the emit final strategy is used
+ if (materialized.cachingEnabled() && emitStrategy.type() !=
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+ builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return materialized.retention() != null
+ ? materialized.retention().toMillis()
+ : sessionWindows.inactivityGap() +
sessionWindows.gracePeriodMs();
+ }
+
+ @Override
+ public long historyRetention() {
+ throw new IllegalStateException(
+ "historyRetention is not supported when not a versioned
store");
+ }
+
+ @Override
+ public boolean isWindowStore() {
+ return true;
+ }
+
+ @Override
+ public boolean isVersionedStore() {
+ return false;
+ }
+
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
index bbe7ce54fb6..fc230832984 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -29,14 +30,8 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -97,7 +92,10 @@ public class SessionWindowedCogroupedKStreamImpl<K, V>
extends
groupPatterns,
initializer,
new NamedInternal(named),
- materialize(materializedInternal),
+ new SessionStoreMaterializer<>(
+ materializedInternal,
+ sessionWindows,
+ EmitStrategy.onWindowUpdate()),
materializedInternal.keySerde() != null ?
new WindowedSerdes.SessionWindowedSerde<>(
materializedInternal.keySerde()) :
@@ -108,56 +106,4 @@ public class SessionWindowedCogroupedKStreamImpl<K, V>
extends
sessionMerger);
}
- private StoreFactory materialize(final MaterializedInternal<K, V,
SessionStore<Bytes, byte[]>> materialized) {
- SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier)
materialized.storeSupplier();
- if (supplier == null) {
- final long retentionPeriod = materialized.retention() != null ?
- materialized.retention().toMillis() :
sessionWindows.inactivityGap() + sessionWindows.gracePeriodMs();
-
- if ((sessionWindows.inactivityGap() +
sessionWindows.gracePeriodMs()) > retentionPeriod) {
- throw new IllegalArgumentException("The retention period of
the session store "
- + materialized.storeName()
- + " must be no smaller than the session inactivity gap
plus the"
- + " grace period."
- + " Got gap=[" + sessionWindows.inactivityGap() + "],"
- + " grace=[" + sessionWindows.gracePeriodMs() + "],"
- + " retention=[" + retentionPeriod + "]");
- }
-
- switch (materialized.storeType()) {
- case IN_MEMORY:
- supplier = Stores.inMemorySessionStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod)
- );
- break;
- case ROCKS_DB:
- supplier = Stores.persistentSessionStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod)
- );
- break;
- default:
- throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
- }
- }
-
- final StoreBuilder<SessionStore<K, V>> builder =
Stores.sessionStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
-
- if (materialized.loggingEnabled()) {
- builder.withLoggingEnabled(materialized.logConfig());
- } else {
- builder.withLoggingDisabled();
- }
-
- if (materialized.cachingEnabled()) {
- builder.withCachingEnabled();
- }
- return new StoreBuilderWrapper(builder);
- }
-
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index 643bc6b2593..d8f3770b79a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -32,15 +32,8 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
-import java.time.Duration;
import java.util.Objects;
import java.util.Set;
@@ -117,7 +110,7 @@ public class SessionWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
final String aggregateName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- materialize(materializedInternal),
+ new SessionStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
@@ -167,7 +160,7 @@ public class SessionWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
final String reduceName = new
NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME);
return aggregateBuilder.build(
new NamedInternal(reduceName),
- materialize(materializedInternal),
+ new SessionStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
@@ -226,7 +219,7 @@ public class SessionWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- materialize(materializedInternal),
+ new SessionStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
new KStreamSessionWindowAggregate<>(
windows,
materializedInternal.storeName(),
@@ -240,67 +233,6 @@ public class SessionWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
false);
}
- private <VR> StoreFactory materialize(final MaterializedInternal<K, VR,
SessionStore<Bytes, byte[]>> materialized) {
- SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier)
materialized.storeSupplier();
- if (supplier == null) {
- final long retentionPeriod = materialized.retention() != null ?
- materialized.retention().toMillis() : windows.inactivityGap()
+ windows.gracePeriodMs();
-
- if ((windows.inactivityGap() + windows.gracePeriodMs()) >
retentionPeriod) {
- throw new IllegalArgumentException("The retention period of
the session store "
- +
materialized.storeName()
- + " must be no smaller
than the session inactivity gap plus the"
- + " grace period."
- + " Got gap=[" +
windows.inactivityGap() + "],"
- + " grace=[" +
windows.gracePeriodMs() + "],"
- + " retention=[" +
retentionPeriod + "]");
- }
-
- switch (materialized.storeType()) {
- case IN_MEMORY:
- supplier = Stores.inMemorySessionStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod)
- );
- break;
- case ROCKS_DB:
- supplier = emitStrategy.type() ==
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
- new RocksDbTimeOrderedSessionBytesStoreSupplier(
- materialized.storeName(),
- retentionPeriod,
- true) :
- Stores.persistentSessionStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod)
- );
- break;
- default:
- throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
- }
- }
-
- final StoreBuilder<SessionStore<K, VR>> builder =
Stores.sessionStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
-
- if (materialized.loggingEnabled()) {
- builder.withLoggingEnabled(materialized.logConfig());
- } else {
- builder.withLoggingDisabled();
- }
-
- // do not enable cache if the emit final strategy is used
- if (materialized.cachingEnabled() && emitStrategy.type() !=
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
- builder.withCachingEnabled();
- } else {
- builder.withCachingDisabled();
- }
-
- return new StoreBuilderWrapper(builder);
- }
-
private Merger<K, V> mergerForAggregator(final Aggregator<K, V, V>
aggregator) {
return (aggKey, aggOne, aggTwo) -> aggregator.apply(aggKey, aggTwo,
aggOne);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
new file mode 100644
index 00000000000..702abfed6e8
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowStoreMaterializer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+
+public class SlidingWindowStoreMaterializer<K, V> extends
MaterializedStoreFactory<K, V, WindowStore<Bytes, byte[]>> {
+
+ private final SlidingWindows windows;
+ private final EmitStrategy emitStrategy;
+ private final long retentionPeriod;
+
+ public SlidingWindowStoreMaterializer(
+ final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>>
materialized,
+ final SlidingWindows windows,
+ final EmitStrategy emitStrategy
+ ) {
+ super(materialized);
+ this.windows = windows;
+ this.emitStrategy = emitStrategy;
+
+ retentionPeriod = retentionPeriod();
+ // large retention time to ensure that all existing windows needed to
create new sliding windows can be accessed
+ // earliest window start time we could need to create corresponding
right window would be recordTime - 2 * timeDifference
+ if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) >
retentionPeriod) {
+ throw new IllegalArgumentException("The retention period of the
window store "
+ + materialized.storeName()
+ + " must be no smaller than 2 * time difference plus the
grace period."
+ + " Got time difference=[" + windows.timeDifferenceMs() +
"],"
+ + " grace=[" + windows.gracePeriodMs() + "],"
+ + " retention=[" + retentionPeriod + "]");
+ }
+ }
+
+ @Override
+ public StateStore build() {
+ WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier)
materialized.storeSupplier();
+ if (supplier == null) {
+
+ switch (defaultStoreType) {
+ case IN_MEMORY:
+ supplier = Stores.inMemoryWindowStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.timeDifferenceMs()),
+ false
+ );
+ break;
+ case ROCKS_DB:
+ supplier = emitStrategy.type() ==
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+
Duration.ofMillis(windows.timeDifferenceMs()),
+ false,
+ true
+ ) :
+ Stores.persistentTimestampedWindowStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+
Duration.ofMillis(windows.timeDifferenceMs()),
+ false
+ );
+ break;
+ default:
+ throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
+ }
+ }
+
+ final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
+ .timestampedWindowStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
+ );
+
+ if (materialized.loggingEnabled()) {
+ builder.withLoggingEnabled(materialized.logConfig());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ // do not enable cache if the emit final strategy is used
+ if (materialized.cachingEnabled() && emitStrategy.type() !=
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+ builder.withCachingEnabled();
+ } else {
+ builder.withCachingDisabled();
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return materialized.retention() != null
+ ? materialized.retention().toMillis()
+ : windows.gracePeriodMs() + 2 * windows.timeDifferenceMs();
+ }
+
+ @Override
+ public long historyRetention() {
+ throw new IllegalStateException(
+ "historyRetention is not supported when not a versioned
store");
+ }
+
+ @Override
+ public boolean isWindowStore() {
+ return true;
+ }
+
+ @Override
+ public boolean isVersionedStore() {
+ return false;
+ }
+
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
index de24fdd838f..e65dc1d07dc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -27,15 +28,8 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -91,7 +85,7 @@ public class SlidingWindowedCogroupedKStreamImpl<K, V>
extends AbstractStream<K,
groupPatterns,
initializer,
new NamedInternal(named),
- materialize(materializedInternal),
+ new SlidingWindowStoreMaterializer<>(materializedInternal,
windows, EmitStrategy.onWindowUpdate()),
materializedInternal.keySerde() != null ?
new FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.timeDifferenceMs())
: null,
@@ -100,62 +94,4 @@ public class SlidingWindowedCogroupedKStreamImpl<K, V>
extends AbstractStream<K,
windows);
}
- private StoreFactory materialize(final MaterializedInternal<K, V,
WindowStore<Bytes, byte[]>> materialized) {
- WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier)
materialized.storeSupplier();
- if (supplier == null) {
- final long retentionPeriod = materialized.retention() != null ?
materialized.retention().toMillis() : windows.gracePeriodMs() + 2 *
windows.timeDifferenceMs();
-
- if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) >
retentionPeriod) {
- throw new IllegalArgumentException("The retention period of
the window store "
- + name
- + " must be no smaller than 2 * time difference plus the
grace period."
- + " Got time difference=[" + windows.timeDifferenceMs() +
"],"
- + " grace=[" + windows.gracePeriodMs()
- + "],"
- + " retention=[" + retentionPeriod
- + "]");
- }
-
- switch (materialized.storeType()) {
- case IN_MEMORY:
- supplier = Stores.inMemoryWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.timeDifferenceMs()),
- false
- );
- break;
- case ROCKS_DB:
- supplier = Stores.persistentTimestampedWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.timeDifferenceMs()),
- false
- );
- break;
- default:
- throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
- }
- }
-
- final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
- .timestampedWindowStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
-
- if (materialized.loggingEnabled()) {
- builder.withLoggingEnabled(materialized.logConfig());
- } else {
- builder.withLoggingDisabled();
- }
- if (materialized.cachingEnabled()) {
- builder.withCachingEnabled();
- } else {
- builder.withCachingDisabled();
- }
- return new StoreBuilderWrapper(builder);
- }
-
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
index 4f29bd3093b..3cb7b3f29bd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
-import org.apache.kafka.streams.kstream.EmitStrategy.StrategyType;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -31,18 +30,10 @@ import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import java.time.Duration;
import java.util.Objects;
import java.util.Set;
-import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import static
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
import static
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
@@ -102,7 +93,7 @@ public class SlidingWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- materialize(materializedInternal),
+ new SlidingWindowStoreMaterializer<>(materializedInternal,
windows, emitStrategy),
new KStreamSlidingWindowAggregate<>(windows,
materializedInternal.storeName(), emitStrategy,
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.timeDifferenceMs()) : null,
@@ -147,7 +138,7 @@ public class SlidingWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- materialize(materializedInternal),
+ new SlidingWindowStoreMaterializer<>(materializedInternal,
windows, emitStrategy),
new KStreamSlidingWindowAggregate<>(windows,
materializedInternal.storeName(), emitStrategy, initializer, aggregator),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.timeDifferenceMs()) : null,
@@ -193,7 +184,7 @@ public class SlidingWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
return aggregateBuilder.build(
new NamedInternal(reduceName),
- materialize(materializedInternal),
+ new SlidingWindowStoreMaterializer<>(materializedInternal,
windows, emitStrategy),
new KStreamSlidingWindowAggregate<>(windows,
materializedInternal.storeName(), emitStrategy,
aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)),
materializedInternal.queryableStoreName(),
materializedInternal.keySerde() != null ? new
FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.timeDifferenceMs()) : null,
@@ -207,72 +198,6 @@ public class SlidingWindowedKStreamImpl<K, V> extends
AbstractStream<K, V> imple
return this;
}
- private <VR> StoreFactory materialize(final MaterializedInternal<K, VR,
WindowStore<Bytes, byte[]>> materialized) {
- WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier)
materialized.storeSupplier();
- if (supplier == null) {
- final long retentionPeriod = materialized.retention() != null ?
materialized.retention().toMillis() : windows.gracePeriodMs() + 2 *
windows.timeDifferenceMs();
-
- // large retention time to ensure that all existing windows needed
to create new sliding windows can be accessed
- // earliest window start time we could need to create
corresponding right window would be recordTime - 2 * timeDifference
- if ((windows.timeDifferenceMs() * 2 + windows.gracePeriodMs()) >
retentionPeriod) {
- throw new IllegalArgumentException("The retention period of
the window store "
- + name + " must be no smaller than 2 * time difference
plus the grace period."
- + " Got time difference=[" +
windows.timeDifferenceMs() + "],"
- + " grace=[" + windows.gracePeriodMs() + "],"
- + " retention=[" + retentionPeriod + "]");
- }
-
- switch (materialized.storeType()) {
- case IN_MEMORY:
- supplier = Stores.inMemoryWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.timeDifferenceMs()),
- false
- );
- break;
- case ROCKS_DB:
- supplier = emitStrategy.type() ==
StrategyType.ON_WINDOW_CLOSE ?
-
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.timeDifferenceMs()),
- false,
- true
- ) :
- Stores.persistentTimestampedWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.timeDifferenceMs()),
- false
- );
- break;
- default:
- throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
- }
- }
-
- final StoreBuilder<TimestampedWindowStore<K, VR>> builder =
Stores.timestampedWindowStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
-
- if (materialized.loggingEnabled()) {
- builder.withLoggingEnabled(materialized.logConfig());
- } else {
- builder.withLoggingDisabled();
- }
-
- // do not enable cache if the emit final strategy is used
- if (materialized.cachingEnabled() && emitStrategy.type() !=
StrategyType.ON_WINDOW_CLOSE) {
- builder.withCachingEnabled();
- } else {
- builder.withCachingDisabled();
- }
- return new StoreBuilderWrapper(builder);
- }
-
private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer)
{
return (aggKey, value, aggregate) -> aggregate == null ? value :
reducer.apply(aggregate, value);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
index 4236af7af53..4bda23c2aa9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImpl.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -28,15 +29,8 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -95,7 +89,7 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W extends
Window> extends Ab
groupPatterns,
initializer,
new NamedInternal(named),
- materialize(materializedInternal),
+ new WindowStoreMaterializer<>(materializedInternal, windows,
EmitStrategy.onWindowUpdate()),
materializedInternal.keySerde() != null ?
new FullTimeWindowedSerde<>(materializedInternal.keySerde(),
windows.size())
: null,
@@ -103,63 +97,4 @@ public class TimeWindowedCogroupedKStreamImpl<K, V, W
extends Window> extends Ab
materializedInternal.queryableStoreName(),
windows);
}
-
- private StoreFactory materialize(
- final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>>
materialized) {
- WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier)
materialized.storeSupplier();
- if (supplier == null) {
- final long retentionPeriod = materialized.retention() != null ?
- materialized.retention().toMillis() : windows.size() +
windows.gracePeriodMs();
-
- if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
- throw new IllegalArgumentException("The retention period of
the window store "
- + name
- + " must be no smaller than its window size plus the
grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs()
- + "],"
- + " retention=[" + retentionPeriod
- + "]");
- }
-
- switch (materialized.storeType()) {
- case IN_MEMORY:
- supplier = Stores.inMemoryWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false
- );
- break;
- case ROCKS_DB:
- supplier = Stores.persistentTimestampedWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false
- );
- break;
- default:
- throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
- }
- }
-
- final StoreBuilder<TimestampedWindowStore<K, V>> builder = Stores
- .timestampedWindowStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
-
- if (materialized.loggingEnabled()) {
- builder.withLoggingEnabled(materialized.logConfig());
- } else {
- builder.withLoggingDisabled();
- }
-
- if (materialized.cachingEnabled()) {
- builder.withCachingEnabled();
- }
- return new StoreBuilderWrapper(builder);
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index 0c6b79ce4f6..b615e20714b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -33,18 +33,10 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
-import org.apache.kafka.streams.processor.internals.StoreFactory;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
-import java.time.Duration;
import java.util.Objects;
import java.util.Set;
-import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import static
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME;
import static
org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME;
@@ -113,7 +105,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends
Window> extends AbstractStr
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- materialize(materializedInternal),
+ new WindowStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
new KStreamWindowAggregate<>(
windows,
materializedInternal.storeName(),
@@ -165,7 +157,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends
Window> extends AbstractStr
return aggregateBuilder.build(
new NamedInternal(aggregateName),
- materialize(materializedInternal),
+ new WindowStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
new KStreamWindowAggregate<>(
windows,
materializedInternal.storeName(),
@@ -216,7 +208,7 @@ public class TimeWindowedKStreamImpl<K, V, W extends
Window> extends AbstractStr
return aggregateBuilder.build(
new NamedInternal(reduceName),
- materialize(materializedInternal),
+ new WindowStoreMaterializer<>(materializedInternal, windows,
emitStrategy),
new KStreamWindowAggregate<>(
windows,
materializedInternal.storeName(),
@@ -239,68 +231,6 @@ public class TimeWindowedKStreamImpl<K, V, W extends
Window> extends AbstractStr
return this;
}
- private <VR> StoreFactory materialize(final MaterializedInternal<K, VR,
WindowStore<Bytes, byte[]>> materialized) {
- WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier)
materialized.storeSupplier();
- if (supplier == null) {
- final long retentionPeriod = materialized.retention() != null ?
- materialized.retention().toMillis() : windows.size() +
windows.gracePeriodMs();
-
- if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
- throw new IllegalArgumentException("The retention period of
the window store "
- + name + " must be no smaller than its window size
plus the grace period."
- + " Got size=[" + windows.size() + "],"
- + " grace=[" + windows.gracePeriodMs() + "],"
- + " retention=[" + retentionPeriod + "]");
- }
-
- switch (materialized.storeType()) {
- case IN_MEMORY:
- supplier = Stores.inMemoryWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false
- );
- break;
- case ROCKS_DB:
- supplier = emitStrategy.type() ==
StrategyType.ON_WINDOW_CLOSE ?
-
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false,
- false
- ) :
- Stores.persistentTimestampedWindowStore(
- materialized.storeName(),
- Duration.ofMillis(retentionPeriod),
- Duration.ofMillis(windows.size()),
- false
- );
- break;
- default:
- throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
- }
- }
-
- final StoreBuilder<TimestampedWindowStore<K, VR>> builder =
Stores.timestampedWindowStoreBuilder(
- supplier,
- materialized.keySerde(),
- materialized.valueSerde()
- );
-
- if (materialized.loggingEnabled()) {
- builder.withLoggingEnabled(materialized.logConfig());
- } else {
- builder.withLoggingDisabled();
- }
-
- if (materialized.cachingEnabled()) {
- builder.withCachingEnabled();
- }
- return new StoreBuilderWrapper(builder);
- }
-
private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer)
{
return (aggKey, value, aggregate) -> aggregate == null ? value :
reducer.apply(aggregate, value);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
new file mode 100644
index 00000000000..349cca7985d
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowStoreMaterializer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import java.time.Duration;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+
+public class WindowStoreMaterializer<K, V> extends MaterializedStoreFactory<K,
V, WindowStore<Bytes, byte[]>> {
+
+ private final Windows<?> windows;
+ private final EmitStrategy emitStrategy;
+ private final long retentionPeriod;
+
+ public WindowStoreMaterializer(
+ final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>>
materialized,
+ final Windows<?> windows,
+ final EmitStrategy emitStrategy
+ ) {
+ super(materialized);
+ this.windows = windows;
+ this.emitStrategy = emitStrategy;
+
+ retentionPeriod = retentionPeriod();
+
+ if ((windows.size() + windows.gracePeriodMs()) > retentionPeriod) {
+ throw new IllegalArgumentException("The retention period of the
window store "
+ + materialized.storeName() + " must be no smaller than its
window size plus the grace period."
+ + " Got size=[" + windows.size() + "],"
+ + " grace=[" + windows.gracePeriodMs() + "],"
+ + " retention=[" + retentionPeriod + "]");
+ }
+ }
+
+ @Override
+ public StateStore build() {
+ WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier)
materialized.storeSupplier();
+ if (supplier == null) {
+ switch (defaultStoreType) {
+ case IN_MEMORY:
+ supplier = Stores.inMemoryWindowStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.size()),
+ false
+ );
+ break;
+ case ROCKS_DB:
+ supplier = emitStrategy.type() ==
EmitStrategy.StrategyType.ON_WINDOW_CLOSE ?
+
RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.size()),
+ false,
+ false
+ ) :
+ Stores.persistentTimestampedWindowStore(
+ materialized.storeName(),
+ Duration.ofMillis(retentionPeriod),
+ Duration.ofMillis(windows.size()),
+ false
+ );
+ break;
+ default:
+ throw new IllegalStateException("Unknown store type: " +
materialized.storeType());
+ }
+ }
+
+ final StoreBuilder<TimestampedWindowStore<K, V>> builder =
Stores.timestampedWindowStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde()
+ );
+
+ if (materialized.loggingEnabled()) {
+ builder.withLoggingEnabled(materialized.logConfig());
+ } else {
+ builder.withLoggingDisabled();
+ }
+
+ if (materialized.cachingEnabled()) {
+ builder.withCachingEnabled();
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public long retentionPeriod() {
+ return materialized.retention() != null
+ ? materialized.retention().toMillis()
+ : windows.size() + windows.gracePeriodMs();
+ }
+
+ @Override
+ public long historyRetention() {
+ throw new IllegalStateException(
+ "historyRetention is not supported when not a versioned
store");
+ }
+
+ @Override
+ public boolean isWindowStore() {
+ return true;
+ }
+
+ @Override
+ public boolean isVersionedStore() {
+ return false;
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 9f7bc0206c6..87dc9301542 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -16,29 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
-import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
-import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
@@ -60,6 +37,30 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
+import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
+import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
+
public class InternalTopologyBuilder {
public InternalTopologyBuilder() {