ableegoldman commented on code in PR #14648:
URL: https://github.com/apache/kafka/pull/14648#discussion_r1377093727


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ROCKS_DB = "rocksDB";
     public static final String IN_MEMORY = "in_memory";
 
+    /** {@code dsl.store.suppliers.class } */
+    public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = 
"dsl.store.suppliers.class";
+    public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which 
store implementations to plug in to DSL operators. Must implement the 
<code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";

Review Comment:
   nit: the doc string and default can (and probably should) just be `private`. 
Not a big deal but it keeps them out of the official public API and helps avoid 
awkward/annoying trivial things like having to deprecate all of these as well, 
not being able to move/rename if necessary,  you get the idea



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -209,19 +219,22 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
             deserializationExceptionHandlerSupplier = () -> 
globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 DeserializationExceptionHandler.class);
         }
 
-        if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) {
-            storeType = getString(DEFAULT_DSL_STORE_CONFIG);
+        // prefer dsl.store.suppliers.class to default.dsl.store and prefer 
topology overrides over
+        // the global streams configuration
+        if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, 
topologyOverrides)) {
+            storeSuppliers = (DslStoreSuppliers) 
Utils.newInstance(getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG));
+            log.info("Topology {} is overriding {} to {}", topologyName, 
DSL_STORE_SUPPLIERS_CLASS_CONFIG, storeSuppliers);
+        } else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, 
topologyOverrides)) {
+            final String storeType = getString(DEFAULT_DSL_STORE_CONFIG);
+            storeSuppliers = storeType.equals(IN_MEMORY) ? 
Materialized.StoreType.IN_MEMORY : Materialized.StoreType.ROCKS_DB;
             log.info("Topology {} is overriding {} to {}", topologyName, 
DEFAULT_DSL_STORE_CONFIG, storeType);
         } else {
-            storeType = globalAppConfigs.getString(DEFAULT_DSL_STORE_CONFIG);
+            storeSuppliers = (DslStoreSuppliers) 
Utils.newInstance(globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG));

Review Comment:
   I feel like we're missing a branch here -- if someone is using the old 
`default.dsl.store`  but only sets it in their global app configs (ie via `new 
KafkaStreams(...)`) and sets neither of these configs in their topology configs 
(ie `new StreamsBuilder(...)`), then we should still respect the old config and 
fall back on that in the global app configs



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ROCKS_DB = "rocksDB";

Review Comment:
   IIRC we decided to deprecate the old config, in which case we should make 
sure to add a `@Deprecated` tag to the config name and these two options. I 
guess since the doc string is public it should technically also be deprecated...



##########
streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java:
##########
@@ -97,21 +128,21 @@ protected Materialized(final Materialized<K, V, S> 
materialized) {
         this.cachingEnabled = materialized.cachingEnabled;
         this.topicConfig = materialized.topicConfig;
         this.retention = materialized.retention;
-        this.storeType = materialized.storeType;
+        this.storeSuppliers = materialized.storeSuppliers;
     }
 
     /**
-     * Materialize a {@link StateStore} with the given {@link StoreType}.
+     * Materialize a {@link StateStore} with the given {@link 
DslStoreSuppliers}.
      *
-     * @param storeType  the type of the state store
-     * @param <K>       key type of the store
-     * @param <V>       value type of the store
-     * @param <S>       type of the {@link StateStore}
+     * @param storeSuppliers  the type of the state store
+     * @param <K>             key type of the store
+     * @param <V>             value type of the store
+     * @param <S>             type of the {@link StateStore}
      * @return a new {@link Materialized} instance with the given storeName
      */
-    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final 
StoreType storeType) {
-        Objects.requireNonNull(storeType, "store type can't be null");
-        return new Materialized<>(storeType);
+    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final 
DslStoreSuppliers storeSuppliers) {

Review Comment:
   Just to clarify (for myself as well as anyone else reviewing this), we are 
able to simply swap in the new `DSLStoreSuppliers` parameter -- instead of 
deprecating this and adding a new API -- because source-code compatibility is 
maintained due to the new `StoreType` inheritance.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImpl.java:
##########
@@ -106,7 +108,7 @@ public KTable<Windowed<K>, V> aggregate(final 
Initializer<V> initializer,
             sessionMerger);
     }
 
-    private  StoreBuilder<SessionStore<K, V>> materialize(final 
MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materialized) {
+    private StoreBuilder<SessionStore<K, V>> materialize(final 
MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> materialized) {
         SessionBytesStoreSupplier supplier = (SessionBytesStoreSupplier) 
materialized.storeSupplier();
         if (supplier == null) {

Review Comment:
   ditto here -- seems like we can simplify things to make the `supplier` 
variable final.
   
   I'm going to stop commenting on this because I suspect the pattern repeats a 
lot, so I'll leave it up to you to make a pass over the rest. And I know it 
feels petty, but given the logic around initializing and properly configuring 
the default store suppliers is relatively complex, I actually do believe 
there's value in trying to make things as straightforward as possible, 
including using `final` variables for things like this.



##########
streams/src/main/java/org/apache/kafka/streams/state/BuiltinDslStoreSuppliers.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+
+public class BuiltinDslStoreSuppliers {

Review Comment:
   nit: seems like we capitalize the "I" in "BuiltIn" for other public APIs 
that have this in the name -- not a huge deal but probably best to be consistent



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java:
##########
@@ -46,16 +47,9 @@ public KeyValueStoreMaterializer(final 
MaterializedInternal<K, V, KeyValueStore<
     public StoreBuilder<?> materialize() {
         KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) 
materialized.storeSupplier();
         if (supplier == null) {
-            switch (materialized.storeType()) {
-                case IN_MEMORY:
-                    supplier = 
Stores.inMemoryKeyValueStore(materialized.storeName());
-                    break;
-                case ROCKS_DB:
-                    supplier = 
Stores.persistentTimestampedKeyValueStore(materialized.storeName());
-                    break;
-                default:
-                    throw new IllegalStateException("Unknown store type: " + 
materialized.storeType());
-            }
+            supplier = materialized.storeSuppliers.keyValueStore(

Review Comment:
   nit: since we simplified this part of the code so much, can we go one step 
further and refactor so that the `supplier` variable is final?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -536,6 +537,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ROCKS_DB = "rocksDB";
     public static final String IN_MEMORY = "in_memory";
 
+    /** {@code dsl.store.suppliers.class } */
+    public static final String DSL_STORE_SUPPLIERS_CLASS_CONFIG = 
"dsl.store.suppliers.class";
+    public static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which 
store implementations to plug in to DSL operators. Must implement the 
<code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
+    public static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = 
BuiltinDslStoreSuppliers.RocksDbDslStoreSuppliers.class;

Review Comment:
   Frankly idk how much it matters, or even if it does at all, but generally we 
use the class name (ie a `String` for configs whenever possible, eg for 
defaults. It's just easier to work with strings (and I have a small nagging 
sense that we may autogenerate docs based on the code here which might struggle 
with the `Class` variant vs the `String` class name...but I'm not 100% sure)



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -117,7 +127,7 @@ public class TopologyConfig extends AbstractConfig {
     public final long cacheSize;
     public final long maxTaskIdleMs;
     public final long taskTimeoutMs;
-    public final String storeType;
+    private final DslStoreSuppliers storeSuppliers;

Review Comment:
   why is only this field `private`?
   
   (I suspect the answer is that they should all be private but we just use 
this class as a loose container object that doesn't bother with getters. Even 
so, let's just conform to the current pattern and make this public like the 
rest)



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -209,19 +219,22 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
             deserializationExceptionHandlerSupplier = () -> 
globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
 DeserializationExceptionHandler.class);
         }
 
-        if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, topologyOverrides)) {
-            storeType = getString(DEFAULT_DSL_STORE_CONFIG);
+        // prefer dsl.store.suppliers.class to default.dsl.store and prefer 
topology overrides over
+        // the global streams configuration
+        if (isTopologyOverride(DSL_STORE_SUPPLIERS_CLASS_CONFIG, 
topologyOverrides)) {
+            storeSuppliers = (DslStoreSuppliers) 
Utils.newInstance(getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG));
+            log.info("Topology {} is overriding {} to {}", topologyName, 
DSL_STORE_SUPPLIERS_CLASS_CONFIG, storeSuppliers);
+        } else if (isTopologyOverride(DEFAULT_DSL_STORE_CONFIG, 
topologyOverrides)) {
+            final String storeType = getString(DEFAULT_DSL_STORE_CONFIG);
+            storeSuppliers = storeType.equals(IN_MEMORY) ? 
Materialized.StoreType.IN_MEMORY : Materialized.StoreType.ROCKS_DB;
             log.info("Topology {} is overriding {} to {}", topologyName, 
DEFAULT_DSL_STORE_CONFIG, storeType);
         } else {
-            storeType = globalAppConfigs.getString(DEFAULT_DSL_STORE_CONFIG);
+            storeSuppliers = (DslStoreSuppliers) 
Utils.newInstance(globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG));
         }
     }
 
-    public Materialized.StoreType parseStoreType() {
-        if (storeType.equals(IN_MEMORY)) {
-            return Materialized.StoreType.IN_MEMORY;
-        }
-        return Materialized.StoreType.ROCKS_DB;
+    public DslStoreSuppliers getStoreSuppliers() {

Review Comment:
   ```suggestion
       public DslStoreSuppliers storeSuppliers() {
   ```
   
   Style rule (no `get` in Streams getters)



##########
streams/src/main/java/org/apache/kafka/streams/state/BuiltinDslStoreSuppliers.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.kstream.EmitStrategy;
+import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
+
+public class BuiltinDslStoreSuppliers {
+
+    public static final DslStoreSuppliers ROCKS_DB = new 
RocksDbDslStoreSuppliers();
+    public static final DslStoreSuppliers IN_MEMORY = new 
InMemoryDslStoreSuppliers();
+
+    public static class RocksDbDslStoreSuppliers implements DslStoreSuppliers {

Review Comment:
   another naming nit: we always capitalize the "B" in "RocksDB" in class names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to