[ 
https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500961#comment-16500961
 ] 

ASF GitHub Bot commented on KAFKA-6813:
---------------------------------------

guozhangwang closed pull request #5075: KAFKA-6813: return to double-counting 
for count topology names
URL: https://github.com/apache/kafka/pull/5075
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index ead8a7663ab..517104da323 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -224,9 +224,9 @@
         Objects.requireNonNull(materialized, "materialized can't be null");
         final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
         
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
-        return internalStreamsBuilder.table(topic,
-                                            consumedInternal,
-                                            new 
MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
+        return internalStreamsBuilder.table(topic, consumedInternal, 
materializedInternal);
     }
 
     /**
@@ -273,12 +273,10 @@
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
-        return internalStreamsBuilder.table(topic,
-                                            consumedInternal,
-                                            new MaterializedInternal<>(
-                                                    Materialized.<K, V, 
KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()),
-                                                    internalStreamsBuilder,
-                                                    topic + "-"));
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
+            new 
MaterializedInternal<>(Materialized.with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()));
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
+        return internalStreamsBuilder.table(topic, consumedInternal, 
materializedInternal);
     }
 
     /**
@@ -302,8 +300,9 @@
                                                   final Materialized<K, V, 
KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, 
internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
+
         return internalStreamsBuilder.table(topic,
                                             new 
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                
  materializedInternal.valueSerde())),
@@ -331,14 +330,11 @@
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(consumed, "consumed can't be null");
         final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materialized =
-                new MaterializedInternal<>(
-                        Materialized.<K, V, KeyValueStore<Bytes, 
byte[]>>with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
-                        internalStreamsBuilder,
-                        topic + "-");
-
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
+                new MaterializedInternal<>(Materialized.<K, V, 
KeyValueStore<Bytes, byte[]>>with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()));
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
 
-        return internalStreamsBuilder.globalTable(topic, consumedInternal, 
materialized);
+        return internalStreamsBuilder.globalTable(topic, consumedInternal, 
materializedInternal);
     }
 
     /**
@@ -402,9 +398,10 @@
         final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
         // always use the serdes from consumed
         
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
-        return internalStreamsBuilder.globalTable(topic,
-                                                  consumedInternal,
-                                                  new 
MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
+
+        return internalStreamsBuilder.globalTable(topic, consumedInternal, 
materializedInternal);
     }
 
     /**
@@ -436,8 +433,9 @@
                                                               final 
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
-                new MaterializedInternal<>(materialized, 
internalStreamsBuilder, topic + "-");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
+
         return internalStreamsBuilder.globalTable(topic,
                                                   new 
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
                                                                                
        materializedInternal.valueSerde())),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 3a9f9197ac7..74f930ee357 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -74,8 +74,10 @@
                                final Materialized<K, V, KeyValueStore<Bytes, 
byte[]>> materialized) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
REDUCE_NAME);
+
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -97,8 +99,9 @@
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -117,14 +120,26 @@
 
     @Override
     public KTable<K, Long> count() {
-        return count(Materialized.<K, Long, KeyValueStore<Bytes, 
byte[]>>with(keySerde, Serdes.Long()));
+        return doCount(Materialized.with(keySerde, Serdes.Long()));
     }
 
     @Override
     public KTable<K, Long> count(final Materialized<K, Long, 
KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it 
for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(materialized);
+    }
+
+    private KTable<K, Long> doCount(final Materialized<K, Long, 
KeyValueStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -133,9 +148,9 @@
         }
 
         return doAggregate(
-                new KStreamAggregate<>(materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-                AGGREGATE_NAME,
-                materializedInternal);
+            new KStreamAggregate<>(materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+            AGGREGATE_NAME,
+            materializedInternal);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index db119f30fd5..49f258bf503 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -128,8 +128,9 @@ private void buildAggregate(final ProcessorSupplier<K, 
Change<V>> aggregateSuppl
         Objects.requireNonNull(adder, "adder can't be null");
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -150,8 +151,9 @@ private void buildAggregate(final ProcessorSupplier<K, 
Change<V>> aggregateSuppl
 
     @Override
     public KTable<K, Long> count(final Materialized<K, Long, 
KeyValueStore<Bytes, byte[]>> materialized) {
-        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+        final MaterializedInternal<K, Long, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -182,8 +184,9 @@ private void buildAggregate(final ProcessorSupplier<K, 
Change<V>> aggregateSuppl
         Objects.requireNonNull(subtractor, "subtractor can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
-                new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index bcd31bba16e..21c15058bb9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -154,7 +154,10 @@ public String queryableStoreName() {
                                final Materialized<K, V, KeyValueStore<Bytes, 
byte[]>> materialized) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doFilter(predicate, new MaterializedInternal<>(materialized, 
builder, FILTER_NAME), false);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME);
+
+        return doFilter(predicate, materializedInternal, false);
     }
 
     @Override
@@ -168,7 +171,10 @@ public String queryableStoreName() {
                                   final Materialized<K, V, 
KeyValueStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(predicate, "predicate can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doFilter(predicate, new MaterializedInternal<>(materialized, 
builder, FILTER_NAME), true);
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, FILTER_NAME);
+
+        return doFilter(predicate, materializedInternal, true);
     }
 
     private <VR> KTable<K, VR> doMapValues(final ValueMapperWithKey<? super K, 
? super V, ? extends VR> mapper,
@@ -210,7 +216,10 @@ public String queryableStoreName() {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        return doMapValues(withKey(mapper), new 
MaterializedInternal<>(materialized, builder, MAPVALUES_NAME));
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
MAPVALUES_NAME);
+
+        return doMapValues(withKey(mapper), materializedInternal);
     }
 
     @Override
@@ -219,7 +228,10 @@ public String queryableStoreName() {
         Objects.requireNonNull(mapper, "mapper can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        return doMapValues(mapper, new MaterializedInternal<>(materialized, 
builder, MAPVALUES_NAME));
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
MAPVALUES_NAME);
+
+        return doMapValues(mapper, materializedInternal);
     }
 
     @Override
@@ -233,8 +245,10 @@ public String queryableStoreName() {
                                               final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized,
                                               final String... stateStoreNames) 
{
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doTransformValues(transformerSupplier,
-            new MaterializedInternal<>(materialized, builder, 
TRANSFORMVALUES_NAME), stateStoreNames);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
TRANSFORMVALUES_NAME);
+
+        return doTransformValues(transformerSupplier, materializedInternal, 
stateStoreNames);
     }
 
     private <VR> KTable<K, VR> doTransformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
transformerSupplier,
@@ -304,7 +318,10 @@ public V apply(final K key, final Change<V> change) {
         Objects.requireNonNull(other, "other can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        return doJoin(other, joiner, new MaterializedInternal<>(materialized, 
builder, MERGE_NAME), false, false);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+
+        return doJoin(other, joiner, materializedInternal, false, false);
     }
 
     @Override
@@ -317,7 +334,10 @@ public V apply(final K key, final Change<V> change) {
     public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                             final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
                                             final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoin(other, joiner, new MaterializedInternal<>(materialized, 
builder, MERGE_NAME), true, true);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+
+        return doJoin(other, joiner, materializedInternal, true, true);
     }
 
     @Override
@@ -330,11 +350,9 @@ public V apply(final K key, final Change<V> change) {
     public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                            final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
                                            final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized) {
-        return doJoin(other,
-                      joiner,
-                      new MaterializedInternal<>(materialized, builder, 
MERGE_NAME),
-                      true,
-                      false);
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, MERGE_NAME);
+        return doJoin(other, joiner, materializedInternal, true, false);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index c933b8687f5..5361e48c1d8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -25,18 +25,17 @@
 
 public class MaterializedInternal<K, V, S extends StateStore> extends 
Materialized<K, V, S> {
 
-    private final boolean queryable;
+    private final boolean queriable;
 
-
-    public MaterializedInternal(final Materialized<K, V, S> materialized,
-                                final InternalNameProvider nameProvider,
-                                final String generatedStorePrefix) {
+    public MaterializedInternal(final Materialized<K, V, S> materialized) {
         super(materialized);
+        queriable = storeName() != null;
+    }
+
+    public void generateStoreNameIfNeeded(final InternalNameProvider 
nameProvider,
+                                          final String generatedStorePrefix) {
         if (storeName() == null) {
-            queryable = false;
             storeName = nameProvider.newStoreName(generatedStorePrefix);
-        } else {
-            queryable = true;
         }
     }
 
@@ -63,7 +62,7 @@ public boolean loggingEnabled() {
         return loggingEnabled;
     }
 
-    public Map<String, String> logConfig() {
+    Map<String, String> logConfig() {
         return topicConfig;
     }
 
@@ -72,6 +71,6 @@ boolean cachingEnabled() {
     }
 
     boolean isQueryable() {
-        return queryable;
+        return queriable;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
index c29c6566c2f..b3cbacd57f6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java
@@ -68,15 +68,26 @@ public Long apply(final K aggKey, final Long aggOne, final 
Long aggTwo) {
 
     @Override
     public KTable<Windowed<K>, Long> count() {
-        return count(Materialized.<K, Long, SessionStore<Bytes, 
byte[]>>with(keySerde, Serdes.Long()));
+        return doCount(Materialized.with(keySerde, Serdes.Long()));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, 
SessionStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it 
for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(materialized);
+    }
+
+    @SuppressWarnings("unchecked")
+    private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, 
SessionStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, SessionStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -85,10 +96,10 @@ public Long apply(final K aggKey, final Long aggOne, final 
Long aggTwo) {
         }
 
         return (KTable<Windowed<K>, Long>) aggregateBuilder.build(
-                new KStreamSessionWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator, countMerger),
-                AGGREGATE_NAME,
-                materialize(materializedInternal),
-                materializedInternal.isQueryable());
+            new KStreamSessionWindowAggregate<>(windows, 
materializedInternal.storeName(), aggregateBuilder.countInitializer, 
aggregateBuilder.countAggregator, countMerger),
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable());
     }
 
     @Override
@@ -103,8 +114,8 @@ public Long apply(final K aggKey, final Long aggOne, final 
Long aggTwo) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
         final Aggregator<K, V, V> reduceAggregator = 
aggregatorForReducer(reducer);
-        final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
REDUCE_NAME);
+        final MaterializedInternal<K, V, SessionStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -136,8 +147,9 @@ public Long apply(final K aggKey, final Long aggOne, final 
Long aggTwo) {
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(sessionMerger, "sessionMerger can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, SessionStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
index d1e5a175847..4f5301b53b2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java
@@ -24,9 +24,9 @@
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.TimeWindowedKStream;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -63,15 +63,27 @@
 
     @Override
     public KTable<Windowed<K>, Long> count() {
-        return count(Materialized.<K, Long, WindowStore<Bytes, 
byte[]>>with(keySerde, Serdes.Long()));
+        return doCount(Materialized.with(keySerde, Serdes.Long()));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, Long> count(final Materialized<K, Long, 
WindowStore<Bytes, byte[]>> materialized) {
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+
+        // TODO: remove this when we do a topology-incompatible release
+        // we used to burn a topology name here, so we have to keep doing it 
for compatibility
+        if (new MaterializedInternal<>(materialized).storeName() == null) {
+            builder.newStoreName(AGGREGATE_NAME);
+        }
+
+        return doCount(materialized);
+    }
+
+    @SuppressWarnings("unchecked")
+    private KTable<Windowed<K>, Long> doCount(final Materialized<K, Long, 
WindowStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -80,9 +92,9 @@
         }
 
         return (KTable<Windowed<K>, Long>) aggregateBuilder.build(new 
KStreamWindowAggregate<>(windows, materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
-                AGGREGATE_NAME,
-                materialize(materializedInternal),
-                materializedInternal.isQueryable());
+            AGGREGATE_NAME,
+            materialize(materializedInternal),
+            materializedInternal.isQueryable());
     }
 
 
@@ -100,8 +112,8 @@
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
-        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
AGGREGATE_NAME);
+        final MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, 
AGGREGATE_NAME);
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
@@ -122,8 +134,9 @@
         Objects.requireNonNull(reducer, "reducer can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
-        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materializedInternal
-                = new MaterializedInternal<>(materialized, builder, 
REDUCE_NAME);
+        final MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> 
materializedInternal = new MaterializedInternal<>(materialized);
+        materializedInternal.generateStoreNameIfNeeded(builder, REDUCE_NAME);
+
         if (materializedInternal.keySerde() == null) {
             materializedInternal.withKeySerde(keySerde);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index a845be3569f..f8f9c7d68a5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -40,6 +44,7 @@
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public class TopologyTest {
@@ -76,12 +81,7 @@ public void shouldNotAllowZeroTopicsWhenAddingSource() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullNameWhenAddingProcessor() {
-        topology.addProcessor(null, new ProcessorSupplier() {
-            @Override
-            public Processor get() {
-                return new MockProcessorSupplier().get();
-            }
-        });
+        topology.addProcessor(null, () -> new MockProcessorSupplier().get());
     }
 
     @Test(expected = NullPointerException.class)
@@ -130,7 +130,7 @@ public void shouldNotAllowToAddSourcesWithSameName() {
         try {
             topology.addSource("source", "topic-2");
             fail("Should throw TopologyException for duplicate source name");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test
@@ -139,7 +139,7 @@ public void shouldNotAllowToAddTopicTwice() {
         try {
             topology.addSource("source-2", "topic-1");
             fail("Should throw TopologyException for already used topic");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test
@@ -167,7 +167,7 @@ public void shouldNotAllowToAddProcessorWithSameName() {
         try {
             topology.addProcessor("processor", new MockProcessorSupplier(), 
"source");
             fail("Should throw TopologyException for duplicate processor 
name");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test(expected = TopologyException.class)
@@ -187,7 +187,7 @@ public void shouldNotAllowToAddSinkWithSameName() {
         try {
             topology.addSink("sink", "topic-3", "source");
             fail("Should throw TopologyException for duplicate sink name");
-        } catch (TopologyException expected) { }
+        } catch (final TopologyException expected) { }
     }
 
     @Test(expected = TopologyException.class)
@@ -257,7 +257,7 @@ public void shouldNotAllowToAddStoreWithSameName() {
     }
 
     @Test
-    public void shouldThrowOnUnassignedStateStoreAccess() throws Exception {
+    public void shouldThrowOnUnassignedStateStoreAccess() {
         final String sourceNodeName = "source";
         final String goodNodeName = "goodGuy";
         final String badNodeName = "badGuy";
@@ -283,7 +283,7 @@ public void shouldThrowOnUnassignedStateStoreAccess() 
throws Exception {
         } catch (final StreamsException e) {
             final String error = e.toString();
             final String expectedMessage = 
"org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor " + badNodeName;
-            
+
             assertThat(error, equalTo(expectedMessage));
         }
     }
@@ -295,12 +295,12 @@ public void shouldThrowOnUnassignedStateStoreAccess() 
throws Exception {
         public Processor get() {
             return new Processor() {
                 @Override
-                public void init(ProcessorContext context) {
+                public void init(final ProcessorContext context) {
                     context.getStateStore(STORE_NAME);
                 }
 
                 @Override
-                public void process(Object key, Object value) { }
+                public void process(final Object key, final Object value) { }
 
                 @Override
                 public void close() { }
@@ -324,7 +324,7 @@ public void 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
 
     @Test
     public void shouldDescribeEmptyTopology() {
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -333,9 +333,9 @@ public void singleSourceShouldHaveSingleSubtopology() {
 
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                
Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+                Collections.singleton(expectedSourceNode)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -344,9 +344,9 @@ public void 
singleSourceWithListOfTopicsShouldHaveSingleSubtopology() {
 
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                
Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+                Collections.singleton(expectedSourceNode)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -355,9 +355,9 @@ public void 
singleSourcePatternShouldHaveSingleSubtopology() {
 
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                
Collections.<TopologyDescription.Node>singleton(expectedSourceNode)));
+                Collections.singleton(expectedSourceNode)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -365,19 +365,19 @@ public void 
multipleSourcesShouldHaveDistinctSubtopologies() {
         final TopologyDescription.Source expectedSourceNode1 = 
addSource("source1", "topic1");
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(0,
-                
Collections.<TopologyDescription.Node>singleton(expectedSourceNode1)));
+                Collections.singleton(expectedSourceNode1)));
 
         final TopologyDescription.Source expectedSourceNode2 = 
addSource("source2", "topic2");
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(1,
-                
Collections.<TopologyDescription.Node>singleton(expectedSourceNode2)));
+                Collections.singleton(expectedSourceNode2)));
 
         final TopologyDescription.Source expectedSourceNode3 = 
addSource("source3", "topic3");
         expectedDescription.addSubtopology(
             new InternalTopologyBuilder.Subtopology(2,
-                
Collections.<TopologyDescription.Node>singleton(expectedSourceNode3)));
+                Collections.singleton(expectedSourceNode3)));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -390,7 +390,7 @@ public void sourceAndProcessorShouldHaveSingleSubtopology() 
{
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -405,7 +405,7 @@ public void 
sourceAndProcessorWithStateShouldHaveSingleSubtopology() {
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
 
@@ -421,7 +421,7 @@ public void 
sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology() {
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -436,7 +436,7 @@ public void 
sourceWithMultipleProcessorsShouldHaveSingleSubtopology() {
         allNodes.add(expectedProcessorNode2);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -451,7 +451,7 @@ public void 
processorWithMultipleSourcesShouldHaveSingleSubtopology() {
         allNodes.add(expectedProcessorNode);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -480,7 +480,7 @@ public void 
multipleSourcesWithProcessorsShouldHaveDistinctSubtopologies() {
         allNodes3.add(expectedProcessorNode3);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(2, allNodes3));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -509,7 +509,7 @@ public void 
multipleSourcesWithSinksShouldHaveDistinctSubtopologies() {
         allNodes3.add(expectedSinkNode3);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(2, allNodes3));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -540,7 +540,7 @@ public void 
processorsWithSameSinkShouldHaveSameSubtopology() {
         allNodes.add(expectedSinkNode);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
@@ -570,30 +570,303 @@ public void 
processorsWithSharedStateShouldHaveSameSubtopology() {
         allNodes.add(expectedProcessorNode3);
         expectedDescription.addSubtopology(new 
InternalTopologyBuilder.Subtopology(0, allNodes));
 
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
     public void shouldDescribeGlobalStoreTopology() {
         addGlobalStoreToTopologyAndExpectedDescription("globalStore", 
"source", "globalTopic", "processor", 0);
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
     @Test
     public void shouldDescribeMultipleGlobalStoreTopology() {
         addGlobalStoreToTopologyAndExpectedDescription("globalStore1", 
"source1", "globalTopic1", "processor1", 0);
         addGlobalStoreToTopologyAndExpectedDescription("globalStore2", 
"source2", "globalTopic2", "processor2", 1);
-        assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
+        assertThat(topology.describe(), equalTo(expectedDescription));
+    }
+
+    @Test
+    public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000002\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000002 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void 
kGroupedStreamNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000001\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000001 (stores: 
[count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void 
kGroupedStreamAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000003\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void timeWindowZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(1))
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000002\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000002 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void 
timeWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(1))
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000001\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000001 (stores: 
[count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void 
timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(TimeWindows.of(1))
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000003\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void sessionWindowZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(SessionWindows.with(1))
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000002\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000002 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000001])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void 
sessionWindowNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(SessionWindows.with(1))
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000001\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000001 (stores: 
[count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void 
sessionWindowAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream("input-topic")
+            .groupByKey()
+            .windowedBy(SessionWindows.with(1))
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-AGGREGATE-0000000003\n" +
+                "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void tableZeroArgCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table("input-topic")
+            .groupBy((key, value) -> null)
+            .count();
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: 
[input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: 
[input-topic-STATE-STORE-0000000000])\n" +
+                "      --> KTABLE-SELECT-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-SELECT-0000000003 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000005\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "    Sink: KSTREAM-SINK-0000000005 (topic: 
KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n" +
+                "      <-- KTABLE-SELECT-0000000003\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000006 (topics: 
[KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n" +
+                "      --> KTABLE-AGGREGATE-0000000007\n" +
+                "    Processor: KTABLE-AGGREGATE-0000000007 (stores: 
[KTABLE-AGGREGATE-STATE-STORE-0000000004])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000006\n" +
+                "\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void tableNamedMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table("input-topic")
+            .groupBy((key, value) -> null)
+            .count(Materialized.as("count-store"));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: 
[input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: 
[input-topic-STATE-STORE-0000000000])\n" +
+                "      --> KTABLE-SELECT-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-SELECT-0000000003 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000004\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "    Sink: KSTREAM-SINK-0000000004 (topic: 
count-store-repartition)\n" +
+                "      <-- KTABLE-SELECT-0000000003\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000005 (topics: 
[count-store-repartition])\n" +
+                "      --> KTABLE-AGGREGATE-0000000006\n" +
+                "    Processor: KTABLE-AGGREGATE-0000000006 (stores: 
[count-store])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000005\n" +
+                "\n",
+            describe.toString()
+        );
+    }
+
+    @Test
+    public void 
tableAnonymousMaterializedCountShouldPreserveTopologyStructure() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table("input-topic")
+            .groupBy((key, value) -> null)
+            .count(Materialized.with(null, Serdes.Long()));
+        final TopologyDescription describe = builder.build().describe();
+        assertEquals(
+            "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000001 (topics: 
[input-topic])\n" +
+                "      --> KTABLE-SOURCE-0000000002\n" +
+                "    Processor: KTABLE-SOURCE-0000000002 (stores: 
[input-topic-STATE-STORE-0000000000])\n" +
+                "      --> KTABLE-SELECT-0000000003\n" +
+                "      <-- KSTREAM-SOURCE-0000000001\n" +
+                "    Processor: KTABLE-SELECT-0000000003 (stores: [])\n" +
+                "      --> KSTREAM-SINK-0000000005\n" +
+                "      <-- KTABLE-SOURCE-0000000002\n" +
+                "    Sink: KSTREAM-SINK-0000000005 (topic: 
KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition)\n" +
+                "      <-- KTABLE-SELECT-0000000003\n" +
+                "\n" +
+                "  Sub-topology: 1\n" +
+                "    Source: KSTREAM-SOURCE-0000000006 (topics: 
[KTABLE-AGGREGATE-STATE-STORE-0000000004-repartition])\n" +
+                "      --> KTABLE-AGGREGATE-0000000007\n" +
+                "    Processor: KTABLE-AGGREGATE-0000000007 (stores: 
[KTABLE-AGGREGATE-STATE-STORE-0000000004])\n" +
+                "      --> none\n" +
+                "      <-- KSTREAM-SOURCE-0000000006\n" +
+                "\n",
+            describe.toString()
+        );
     }
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final String... sourceTopic) {
         topology.addSource(null, sourceName, null, null, null, sourceTopic);
-        String allSourceTopics = sourceTopic[0];
+        final StringBuilder allSourceTopics = new 
StringBuilder(sourceTopic[0]);
         for (int i = 1; i < sourceTopic.length; ++i) {
-            allSourceTopics += ", " + sourceTopic[i];
+            allSourceTopics.append(", ").append(sourceTopic[i]);
         }
-        return new InternalTopologyBuilder.Source(sourceName, allSourceTopics);
+        return new InternalTopologyBuilder.Source(sourceName, 
allSourceTopics.toString());
     }
 
     private TopologyDescription.Source addSource(final String sourceName,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 79bf81e6bb2..63432ffc439 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -33,7 +33,6 @@
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.apache.kafka.test.MockValueJoiner;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -58,12 +57,11 @@
     private final InternalStreamsBuilder builder = new 
InternalStreamsBuilder(new InternalTopologyBuilder());
     private final ConsumedInternal<String, String> consumed = new 
ConsumedInternal<>();
     private final String storePrefix = "prefix-";
-    private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> 
materialized
-            = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("test-store"), builder, storePrefix);
+    private final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new 
MaterializedInternal<>(Materialized.as("test-store"));
 
-    @Before
-    public void setUp() {
+    {
         builder.internalTopologyBuilder.setApplicationId(APP_ID);
+        materialized.generateStoreNameIfNeeded(builder, storePrefix);
     }
 
     @Test
@@ -127,12 +125,10 @@ public boolean test(final String key, final String value) 
{
 
     @Test
     public void 
shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
-        KTable table1 = builder.table("topic2",
-                                      consumed,
-                                      new MaterializedInternal<>(
-                                              Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>with(null, null),
-                                              builder,
-                                              storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.with(null, null));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final KTable table1 = builder.table("topic2", consumed, 
materializedInternal);
 
         final ProcessorTopology topology = 
builder.internalTopologyBuilder.build(null);
 
@@ -147,38 +143,33 @@ public void 
shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() {
     
     @Test
     public void shouldBuildGlobalTableWithNonQueryableStoreName() {
-        final GlobalKTable<String, String> table1 = builder.globalTable(
-            "topic2",
-            consumed,
-            new MaterializedInternal<>(
-                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(null, null),
-                builder,
-                storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.with(null, null));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+
+        final GlobalKTable<String, String> table1 = 
builder.globalTable("topic2", consumed, materializedInternal);
 
         assertNull(table1.queryableStoreName());
     }
 
     @Test
     public void shouldBuildGlobalTableWithQueryaIbleStoreName() {
-        final GlobalKTable<String, String> table1 = builder.globalTable(
-            "topic2",
-            consumed,
-            new MaterializedInternal<>(
-                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("globalTable"),
-                builder,
-                storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as("globalTable"));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final GlobalKTable<String, String> table1 = 
builder.globalTable("topic2", consumed, materializedInternal);
 
         assertEquals("globalTable", table1.queryableStoreName());
     }
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() {
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as("globalTable"));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
         builder.globalTable("table",
                             consumed,
-                            new MaterializedInternal<>(
-                                    Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("globalTable"),
-                                    builder,
-                                    storePrefix));
+            materializedInternal);
 
         final ProcessorTopology topology = 
builder.internalTopologyBuilder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -199,15 +190,18 @@ private void doBuildGlobalTopologyWithAllGlobalTables() {
 
     @Test
     public void shouldBuildGlobalTopologyWithAllGlobalTables() {
-        builder.globalTable("table",
-                            consumed,
-                            new MaterializedInternal<>(
-                                    Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("global1"), builder, storePrefix));
-        builder.globalTable("table2",
-                            consumed,
-                            new MaterializedInternal<>(
-                                    Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("global2"), builder, storePrefix));
-
+        {
+            final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+                new MaterializedInternal<>(Materialized.as("global1"));
+            materializedInternal.generateStoreNameIfNeeded(builder, 
storePrefix);
+            builder.globalTable("table", consumed, materializedInternal);
+        }
+        {
+            final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+                new MaterializedInternal<>(Materialized.as("global2"));
+            materializedInternal.generateStoreNameIfNeeded(builder, 
storePrefix);
+            builder.globalTable("table2", consumed, materializedInternal);
+        }
         doBuildGlobalTopologyWithAllGlobalTables();
     }
 
@@ -216,25 +210,22 @@ public void shouldAddGlobalTablesToEachGroup() {
         final String one = "globalTable";
         final String two = "globalTable2";
 
-        final GlobalKTable<String, String> globalTable = 
builder.globalTable("table",
-                                                                             
consumed,
-                                                                             
new MaterializedInternal<>(
-                                                                               
      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one), 
builder, storePrefix));
-        final GlobalKTable<String, String> globalTable2 = 
builder.globalTable("table2",
-                                                                              
consumed,
-                                                                              
new MaterializedInternal<>(
-                                                                               
       Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two), 
builder, storePrefix));
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as(one));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final GlobalKTable<String, String> globalTable = 
builder.globalTable("table", consumed, materializedInternal);
 
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("not-global"), builder, storePrefix);
-        builder.table("not-global", consumed, materialized);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal2 =
+            new MaterializedInternal<>(Materialized.as(two));
+        materializedInternal2.generateStoreNameIfNeeded(builder, storePrefix);
+        final GlobalKTable<String, String> globalTable2 = 
builder.globalTable("table2", consumed, materializedInternal2);
 
-        final KeyValueMapper<String, String, String> kvMapper = new 
KeyValueMapper<String, String, String>() {
-            @Override
-            public String apply(final String key, final String value) {
-                return value;
-            }
-        };
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternalNotGlobal =
+            new MaterializedInternal<>(Materialized.as("not-global"));
+        materializedInternalNotGlobal.generateStoreNameIfNeeded(builder, 
storePrefix);
+        builder.table("not-global", consumed, materializedInternalNotGlobal);
+
+        final KeyValueMapper<String, String, String> kvMapper = (key, value) 
-> value;
 
         final KStream<String, String> stream = 
builder.stream(Collections.singleton("t1"), consumed);
         stream.leftJoin(globalTable, kvMapper, 
MockValueJoiner.TOSTRING_JOINER);
@@ -260,9 +251,10 @@ public String apply(final String key, final String value) {
     public void shouldMapStateStoresToCorrectSourceTopics() {
         final KStream<String, String> playEvents = 
builder.stream(Collections.singleton("events"), consumed);
 
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("table-store"), builder, storePrefix);
-        final KTable<String, String> table = builder.table("table-topic", 
consumed, materialized);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materializedInternal =
+            new MaterializedInternal<>(Materialized.as("table-store"));
+        materializedInternal.generateStoreNameIfNeeded(builder, storePrefix);
+        final KTable<String, String> table = builder.table("table-topic", 
consumed, materializedInternal);
         assertEquals(Collections.singletonList("table-topic"), 
builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 
         final KStream<String, String> mapped = 
playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
index 5fd76f3acfe..1a83ef13152 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/MaterializedInternalTest.java
@@ -49,8 +49,9 @@ public void 
shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() {
 
         EasyMock.replay(nameProvider);
 
-        final MaterializedInternal<Object, Object, StateStore> materialized
-                = new MaterializedInternal<>(Materialized.with(null, null), 
nameProvider, prefix);
+        final MaterializedInternal<Object, Object, StateStore> materialized =
+            new MaterializedInternal<>(Materialized.with(null, null));
+        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
 
         assertThat(materialized.storeName(), equalTo(generatedName));
         EasyMock.verify(nameProvider);
@@ -59,8 +60,9 @@ public void 
shouldGenerateStoreNameWithPrefixIfProvidedNameIsNull() {
     @Test
     public void shouldUseProvidedStoreNameWhenSet() {
         final String storeName = "store-name";
-        final MaterializedInternal<Object, Object, StateStore> materialized
-                = new MaterializedInternal<>(Materialized.as(storeName), 
nameProvider, prefix);
+        final MaterializedInternal<Object, Object, StateStore> materialized =
+            new MaterializedInternal<>(Materialized.as(storeName));
+        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
     }
 
@@ -69,8 +71,9 @@ public void shouldUseStoreNameOfSupplierWhenProvided() {
         final String storeName = "other-store-name";
         EasyMock.expect(supplier.name()).andReturn(storeName).anyTimes();
         EasyMock.replay(supplier);
-        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.as(supplier), 
nameProvider, prefix);
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.as(supplier));
+        materialized.generateStoreNameIfNeeded(nameProvider, prefix);
         assertThat(materialized.storeName(), equalTo(storeName));
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index c71f4690c4b..f3e9299cb50 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -73,19 +73,21 @@
     @Before
     public void before() {
         final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
-            Materialized.<Object, Object, KeyValueStore<Bytes, 
byte[]>>with(null, null),
+            Materialized.with(null, null));
+        materialized.generateStoreNameIfNeeded(
             new InternalNameProvider() {
                 @Override
-                public String newProcessorName(String prefix) {
+                public String newProcessorName(final String prefix) {
                     return "processorName";
                 }
 
                 @Override
-                public String newStoreName(String prefix) {
+                public String newStoreName(final String prefix) {
                     return GLOBAL_STORE_NAME;
                 }
             },
-            "store-");
+            "store-"
+        );
 
         builder.addGlobalStore(
             (StoreBuilder) new 
KeyValueStoreMaterializer<>(materialized).materialize().withLoggingDisabled(),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 9ba86ac7e14..fc243c61f8d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -53,10 +53,10 @@
 
     @Test
     public void 
shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store"),
-                                             nameProvider,
-                                             storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.as("store"));
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
+
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -69,9 +69,10 @@ public void 
shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnable
 
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withCachingDisabled(), 
nameProvider, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("store").withCachingDisabled()
+        );
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -81,9 +82,11 @@ public void 
shouldCreateBuilderThatBuildsStoreWithCachingDisabled() {
 
     @Test
     public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store")
-                                                     .withLoggingDisabled(), 
nameProvider, storePrefix);
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("store")
+                                                     .withLoggingDisabled()
+        );
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -94,10 +97,11 @@ public void 
shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() {
 
     @Test
     public void 
shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() {
-        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store")
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("store")
                                                      .withCachingDisabled()
-                                                     .withLoggingDisabled(), 
nameProvider, storePrefix);
+                                                     .withLoggingDisabled());
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, String>> builder = 
materializer.materialize();
         final KeyValueStore<String, String> store = builder.build();
@@ -114,8 +118,9 @@ public void 
shouldCreateKeyValueStoreWithTheProvidedInnerStore() {
         EasyMock.expect(supplier.get()).andReturn(store);
         EasyMock.replay(supplier);
 
-        final MaterializedInternal<String, Integer, KeyValueStore<Bytes, 
byte[]>> materialized
-                = new MaterializedInternal<>(Materialized.<String, 
Integer>as(supplier), nameProvider, storePrefix);
+        final MaterializedInternal<String, Integer, KeyValueStore<Bytes, 
byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.as(supplier));
+        materialized.generateStoreNameIfNeeded(nameProvider, storePrefix);
         final KeyValueStoreMaterializer<String, Integer> materializer = new 
KeyValueStoreMaterializer<>(materialized);
         final StoreBuilder<KeyValueStore<String, Integer>> builder = 
materializer.materialize();
         final KeyValueStore<String, Integer> built = builder.build();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index f3dce521998..936c67b8ef8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -829,7 +829,9 @@ public void shouldUpdateStandbyTask() {
         final TopicPartition partition2 = t2p1;
         internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
             .groupByKey().count(Materialized.<Object, Long, 
KeyValueStore<Bytes, byte[]>>as(storeName1));
-        internalStreamsBuilder.table(topic2, new ConsumedInternal(), new 
MaterializedInternal(Materialized.as(storeName2), internalStreamsBuilder, ""));
+        final MaterializedInternal materialized = new 
MaterializedInternal(Materialized.as(storeName2));
+        materialized.generateStoreNameIfNeeded(internalStreamsBuilder, "");
+        internalStreamsBuilder.table(topic2, new ConsumedInternal(), 
materialized);
 
         final StreamThread thread = createStreamThread(clientId, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated APIs from KIP-120 and KIP-182 in Streams
> ----------------------------------------------------------
>
>                 Key: KAFKA-6813
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6813
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>             Fix For: 2.0.0
>
>
> As we move on to the next major release 2.0, we can consider removing the 
> deprecated APIs from KIP-120 and KIP-182.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to