ableegoldman commented on code in PR #14648: URL: https://github.com/apache/kafka/pull/14648#discussion_r1377093727
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig { public static final String ROCKS_DB = "rocksDB"; public static final String IN_MEMORY = "in_memory"; + /** {@code dsl.store.suppliers.class } */ + public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class"; + public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface."; Review Comment: nit: the doc string and default can (and probably should) just be `private`. Not a big deal but it keeps them out of the official public API and helps avoid awkward/annoying trivial things like having to deprecate all of these as well, not being able to move/rename if necessary, you get the idea ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -209,19 +219,22 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } - if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { - storeType = getString(DEFAULT_DSL_STORE_CONFIG); + // prefer dsl.store.suppliers.class to default.dsl.store and prefer topology overrides over + // the global streams configuration + if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides)) { + storeSuppliers = (DslStoreSuppliers) Utils.newInstance(getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG)); + log.info("Topology {} is overriding {} to {}", topologyName, DSL_STORE_SUPPLIERS_CLASS_CONFIG, storeSuppliers); + } else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { + final String storeType = getString(DEFAULT_DSL_STORE_CONFIG); + storeSuppliers = storeType.equals(IN_MEMORY) ? Materialized.StoreType.IN_MEMORY : Materialized.StoreType.ROCKS_DB; log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DSL_STORE_CONFIG, storeType); } else { - storeType = globalAppConfigs.getString(DEFAULT_DSL_STORE_CONFIG); + storeSuppliers = (DslStoreSuppliers) Utils.newInstance(globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG)); Review Comment: I feel like we're missing a branch here -- if someone is using the old `default.dsl.store` but only sets it in their global app configs (ie via `new KafkaStreams(...)`) and sets neither of these configs in their topology configs (ie `new StreamsBuilder(...)`), then we should still respect the old config and fall back on that in the global app configs ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig { public static final String ROCKS_DB = "rocksDB"; Review Comment: IIRC we decided to deprecate the old config, in which case we should make sure to add a `@Deprecated` tag to the config name and these two options. I guess since the doc string is public it should technically also be deprecated... ########## streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java: ########## @@ -97,21 +128,21 @@ protected Materialized(final Materialized<K, V, S> materialized) { this.cachingEnabled = materialized.cachingEnabled; this.topicConfig = materialized.topicConfig; this.retention = materialized.retention; - this.storeType = materialized.storeType; + this.storeSuppliers = materialized.storeSuppliers; } /** - * Materialize a {@link StateStore} with the given {@link StoreType}. + * Materialize a {@link StateStore} with the given {@link DslStoreSuppliers}. * - * @param storeType the type of the state store - * @param <K> key type of the store - * @param <V> value type of the store - * @param <S> type of the {@link StateStore} + * @param storeSuppliers the type of the state store + * @param <K> key type of the store + * @param <V> value type of the store + * @param <S> type of the {@link StateStore} * @return a new {@link Materialized} instance with the given storeName */ - public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StoreType storeType) { - Objects.requireNonNull(storeType, "store type can't be null"); - return new Materialized<>(storeType); + public static <K, V, S extends StateStore> Materialized<K, V, S> as(final DslStoreSuppliers storeSuppliers) { Review Comment: Just to clarify (for myself as well as anyone else reviewing this), we are able to simply swap in the new `DSLStoreSuppliers` parameter -- instead of deprecating this and adding a new API -- because source-code compatibility is maintained due to the new `StoreType` inheritance. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java: ########## @@ -106,7 +108,7 @@ public KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, sessionMerger); } - private StoreBuilder<SessionStore<K, V>> materialize(final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materialized) { + private StoreBuilder<SessionStore<K, V>> materialize(final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materialized) { SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { Review Comment: ditto here -- seems like we can simplify things to make the `supplier` variable final. I'm going to stop commenting on this because I suspect the pattern repeats a lot, so I'll leave it up to you to make a pass over the rest. And I know it feels petty, but given the logic around initializing and properly configuring the default store suppliers is relatively complex, I actually do believe there's value in trying to make things as straightforward as possible, including using `final` variables for things like this. ########## streams/src/main/java/org/apache/kafka/streams/state/BuiltinDslStoreSuppliers.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; + +public class BuiltinDslStoreSuppliers { Review Comment: nit: seems like we capitalize the "I" in "BuiltIn" for other public APIs that have this in the name -- not a huge deal but probably best to be consistent ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java: ########## @@ -46,16 +47,9 @@ public KeyValueStoreMaterializer(final MaterializedInternal<K, V, KeyValueStore< public StoreBuilder<?> materialize() { KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { - switch (materialized.storeType()) { - case IN_MEMORY: - supplier = Stores.inMemoryKeyValueStore(materialized.storeName()); - break; - case ROCKS_DB: - supplier = Stores.persistentTimestampedKeyValueStore(materialized.storeName()); - break; - default: - throw new IllegalStateException("Unknown store type: " + materialized.storeType()); - } + supplier = materialized.storeSuppliers.keyValueStore( Review Comment: nit: since we simplified this part of the code so much, can we go one step further and refactor so that the `supplier` variable is final? ########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig { public static final String ROCKS_DB = "rocksDB"; public static final String IN_MEMORY = "in_memory"; + /** {@code dsl.store.suppliers.class } */ + public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = "dsl.store.suppliers.class"; + public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface."; + public static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltinDslStoreSuppliers.RocksDbDslStoreSuppliers.class; Review Comment: Frankly idk how much it matters, or even if it does at all, but generally we use the class name (ie a `String` for configs whenever possible, eg for defaults. It's just easier to work with strings (and I have a small nagging sense that we may autogenerate docs based on the code here which might struggle with the `Class` variant vs the `String` class name...but I'm not 100% sure) ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -117,7 +127,7 @@ public class TopologyConfig extends AbstractConfig { public final long cacheSize; public final long maxTaskIdleMs; public final long taskTimeoutMs; - public final String storeType; + private final DslStoreSuppliers storeSuppliers; Review Comment: why is only this field `private`? (I suspect the answer is that they should all be private but we just use this class as a loose container object that doesn't bother with getters. Even so, let's just conform to the current pattern and make this public like the rest) ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -209,19 +219,22 @@ public TopologyConfig(final String topologyName, final StreamsConfig globalAppCo deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); } - if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { - storeType = getString(DEFAULT_DSL_STORE_CONFIG); + // prefer dsl.store.suppliers.class to default.dsl.store and prefer topology overrides over + // the global streams configuration + if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, topologyOverrides)) { + storeSuppliers = (DslStoreSuppliers) Utils.newInstance(getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG)); + log.info("Topology {} is overriding {} to {}", topologyName, DSL_STORE_SUPPLIERS_CLASS_CONFIG, storeSuppliers); + } else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) { + final String storeType = getString(DEFAULT_DSL_STORE_CONFIG); + storeSuppliers = storeType.equals(IN_MEMORY) ? Materialized.StoreType.IN_MEMORY : Materialized.StoreType.ROCKS_DB; log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DSL_STORE_CONFIG, storeType); } else { - storeType = globalAppConfigs.getString(DEFAULT_DSL_STORE_CONFIG); + storeSuppliers = (DslStoreSuppliers) Utils.newInstance(globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG)); } } - public Materialized.StoreType parseStoreType() { - if (storeType.equals(IN_MEMORY)) { - return Materialized.StoreType.IN_MEMORY; - } - return Materialized.StoreType.ROCKS_DB; + public DslStoreSuppliers getStoreSuppliers() { Review Comment: ```suggestion public DslStoreSuppliers storeSuppliers() { ``` Style rule (no `get` in Streams getters) ########## streams/src/main/java/org/apache/kafka/streams/state/BuiltinDslStoreSuppliers.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.kstream.EmitStrategy; +import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier; +import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier; + +public class BuiltinDslStoreSuppliers { + + public static final DslStoreSuppliers ROCKS_DB = new RocksDbDslStoreSuppliers(); + public static final DslStoreSuppliers IN_MEMORY = new InMemoryDslStoreSuppliers(); + + public static class RocksDbDslStoreSuppliers implements DslStoreSuppliers { Review Comment: another naming nit: we always capitalize the "B" in "RocksDB" in class names -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org