This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ce5f23295c KAFKA-18023: Enforcing Explicit Naming for Kafka Streams 
Internal Topics (#18233)
3ce5f23295c is described below

commit 3ce5f23295c6d6877285fb9d45717b290554d25a
Author: Sebastien Viale <[email protected]>
AuthorDate: Mon Feb 24 11:41:42 2025 +0100

    KAFKA-18023: Enforcing Explicit Naming for Kafka Streams Internal Topics 
(#18233)
    
    Pull request to implement KIP-1111, aims to add a configuration that
    prevents a Kafka Streams application from starting if any of its
    internal topics have auto-generated names, thereby enforcing explicit
    naming for all internal topics and enhancing the stability of the
    application’s topology.
    
    - Repartition Topics:
    
    All repartition topics are created in the
    KStreamImpl.createRepartitionedSource(...) static method. This method
    either receives a name explicitly provided by the user or null and then
    builds the final repartition topic name.
    
    - Changelog Topics and State Store Names:
    
    There are several scenarios where these are created:
      In the MaterializedInternal constructor.
      During KStream/KStream joins.
      During KStream/KTable joins with grace periods.
      With key-value buffers are used in suppressions.
    
    Reviewers: Lucas Brutschy <[email protected]>, Sophie Blee-Goldman 
<[email protected]>
---
 docs/streams/developer-guide/config-streams.html   |  84 ++--
 .../developer-guide/dsl-topology-naming.html       |  13 +
 docs/streams/upgrade-guide.html                    |  10 +
 .../org/apache/kafka/streams/StreamsBuilder.java   |   4 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |  11 +
 .../org/apache/kafka/streams/TopologyConfig.java   |  13 +-
 .../internals/CogroupedStreamAggregateBuilder.java |  20 +-
 .../internals/GroupedStreamAggregateBuilder.java   |  10 +-
 .../kstream/internals/InternalStreamsBuilder.java  |   6 +-
 .../streams/kstream/internals/KStreamImpl.java     |  42 +-
 .../streams/kstream/internals/KStreamImplJoin.java |  17 +
 .../streams/kstream/internals/KTableImpl.java      |  12 +-
 .../kstream/internals/MaterializedInternal.java    |   8 +
 .../internals/InternalResourcesNaming.java         |  73 +++
 .../internals/InternalTopologyBuilder.java         |  52 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   | 553 +++++++++++++++++++++
 .../apache/kafka/streams/StreamsConfigTest.java    |  14 +
 17 files changed, 881 insertions(+), 61 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index a7d82b97ccd..7f9aaa60616 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -79,6 +79,7 @@ settings.put(... , ...);</code></pre>
               <li><a class="reference internal" href="#default-value-serde" 
id="id9">default.value.serde</a></li>
               <li><a class="reference internal" 
href="#deserialization-exception-handler" 
id="id7">deserialization.exception.handler</a></li>
               <li><a class="reference internal" href="#enable-metrics-push" 
id="id43">enable.metrics.push</a></li>
+              <li><a class="reference internal" 
href="#ensure-explicit-internal-resource-naming" 
id="id46">ensure.explicit.internal.resource.naming</a></li>
               <li><a class="reference internal" 
href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
               <li><a class="reference internal" href="#max-task-idle-ms" 
id="id28">max.task.idle.ms</a></li>
               <li><a class="reference internal" href="#max-warmup-replicas" 
id="id29">max.warmup.replicas</a></li>
@@ -348,17 +349,26 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
             </td>
             
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
           </tr>
-          <tr class="row-odd"><td>log.summary.interval.ms</td>
+          <tr class="row-odd"><td>ensure.explicit.internal.resource.naming</td>
+            <td>High</td>
+            <td colspan="2">
+              Whether to enforce explicit naming for all internal resources of 
the topology, including internal
+              topics (e.g., changelog and repartition topics) and their 
associated state stores.
+              When enabled, the application will refuse to start if any 
internal resource has an auto-generated name.
+            </td>
+            <td><code class="docutils literal"><span 
class="pre">false</span></code></td>
+          </tr>
+          <tr class="row-even"><td>log.summary.interval.ms</td>
             <td>Low</td>
             <td colspan="2">The output interval in milliseconds for logging 
summary information (disabled if negative).</td>
             <td><code class="docutils literal"><span 
class="pre">120000</span></code> (2 minutes)</td>
           </tr>
-          <tr class="row-even"><td>enable.metrics.push</td>
+          <tr class="row-odd"><td>enable.metrics.push</td>
             <td>Low</td>
             <td colspan="2">Whether to enable pushing of client metrics to the 
cluster, if the cluster has a client metrics subscription which matches this 
client.</td>
             <td><code class="docutils literal"><span 
class="pre">true</span></code></td>
           </tr>
-          <tr class="row-odd"><td>max.task.idle.ms</td>
+          <tr class="row-even"><td>max.task.idle.ms</td>
             <td>Medium</td>
             <td colspan="2">
               <p>
@@ -377,76 +387,76 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
             </td>
             <td><code class="docutils literal"><span 
class="pre">0</span></code></td>
           </tr>
-          <tr class="row-even"><td>max.warmup.replicas</td>
+          <tr class="row-odd"><td>max.warmup.replicas</td>
             <td>Medium</td>
             <td colspan="2">The maximum number of warmup replicas (extra 
standbys beyond the configured num.standbys) that can be assigned at once.</td>
             <td><code class="docutils literal"><span 
class="pre">2</span></code></td>
           </tr>
-          <tr class="row-odd"><td>metric.reporters</td>
+          <tr class="row-even"><td>metric.reporters</td>
             <td>Low</td>
             <td colspan="2">A list of classes to use as metrics reporters.</td>
             <td>the empty list</td>
           </tr>
-          <tr class="row-even"><td>metrics.num.samples</td>
+          <tr class="row-odd"><td>metrics.num.samples</td>
             <td>Low</td>
             <td colspan="2">The number of samples maintained to compute 
metrics.</td>
             <td><code class="docutils literal"><span 
class="pre">2</span></code></td>
           </tr>
-          <tr class="row-odd"><td>metrics.recording.level</td>
+          <tr class="row-even"><td>metrics.recording.level</td>
             <td>Low</td>
             <td colspan="2">The highest recording level for metrics.</td>
             <td><code class="docutils literal"><span 
class="pre">INFO</span></code></td>
           </tr>
-          <tr class="row-even"><td>metrics.sample.window.ms</td>
+          <tr class="row-odd"><td>metrics.sample.window.ms</td>
             <td>Low</td>
             <td colspan="2">The window of time in milliseconds a metrics 
sample is computed over.</td>
             <td><code class="docutils literal"><span 
class="pre">30000</span></code> (30 seconds)</td>
           </tr>
-          <tr class="row-odd"><td>num.standby.replicas</td>
+          <tr class="row-even"><td>num.standby.replicas</td>
             <td>High</td>
             <td colspan="2">The number of standby replicas for each task.</td>
             <td><code class="docutils literal"><span 
class="pre">0</span></code></td>
           </tr>
-          <tr class="row-even"><td>num.stream.threads</td>
+          <tr class="row-odd"><td>num.stream.threads</td>
             <td>Medium</td>
             <td colspan="2">The number of threads to execute stream 
processing.</td>
             <td><code class="docutils literal"><span 
class="pre">1</span></code></td>
           </tr>
-          <tr class="row-odd"><td>probing.rebalance.interval.ms</td>
+          <tr class="row-even"><td>probing.rebalance.interval.ms</td>
             <td>Low</td>
             <td colspan="2">The maximum time in milliseconds to wait before 
triggering a rebalance to probe for warmup replicas that have sufficiently 
caught up.</td>
             <td><code class="docutils literal"><span 
class="pre">600000</span></code> (10 minutes)</td>
           </tr>
-          <tr class="row-even"><td>processing.exception.handler</td>
+          <tr class="row-odd"><td>processing.exception.handler</td>
             <td>Medium</td>
             <td colspan="2">Exception handling class that implements the <code 
class="docutils literal"><span 
class="pre">ProcessingExceptionHandler</span></code> interface.</td>
             <td><code class="docutils literal"><span 
class="pre">LogAndFailProcessingExceptionHandler</span></code></td>
           </tr>
-          <tr class="row-odd"><td>processing.guarantee</td>
+          <tr class="row-even"><td>processing.guarantee</td>
             <td>Medium</td>
             <td colspan="2">The processing mode. Can be either <code 
class="docutils literal"><span class="pre">"at_least_once"</span></code>
               or <code class="docutils literal"><span 
class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker 
version 2.5+).
               See <a class="reference internal" 
href="#streams-developer-guide-processing-guarantee"><span class="std 
std-ref">Processing Guarantee</span></a>.</td>.
             <td><code class="docutils literal"><span 
class="pre">"at_least_once"</span></code></td>
           </tr>
-          <tr class="row-even"><td>processor.wrapper.class</td>
+          <tr class="row-odd"><td>processor.wrapper.class</td>
             <td>Medium</td>
             <td colspan="2">A class or class name implementing the <code 
class="docutils literal"><span class="pre">ProcessorWrapper</span></code> 
interface.
               Must be passed in when creating the topology, and will not be 
applied unless passed in to the appropriate constructor as a TopologyConfig. 
You should
               use the <code class="docutils literal"><span 
class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for 
DSL applications, and the
               <code class="docutils literal"><span 
class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI 
applications.</td>
           </tr>
-          <tr class="row-odd"><td>production.exception.handler</td>
+          <tr class="row-even"><td>production.exception.handler</td>
             <td>Medium</td>
             <td colspan="2">Exception handling class that implements the <code 
class="docutils literal"><span 
class="pre">ProductionExceptionHandler</span></code> interface.</td>
             <td><code class="docutils literal"><span 
class="pre">DefaultProductionExceptionHandler</span></code></td>
           </tr>
-          <tr class="row-even"><td>poll.ms</td>
+          <tr class="row-odd"><td>poll.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds to block 
waiting for input.</td>
             <td><code class="docutils literal"><span 
class="pre">100</span></code></td>
           </tr>
-          <tr class="row-odd"><td>rack.aware.assignment.strategy</td>
+          <tr class="row-even"><td>rack.aware.assignment.strategy</td>
             <td>Low</td>
             <td colspan="2">The strategy used for rack aware assignment. 
Acceptable value are
               <code class="docutils literal"><span 
class="pre">"none"</span></code> (default),
@@ -455,7 +465,7 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
               See <a class="reference internal" 
href="#rack-aware-assignment-strategy"><span class="std std-ref">Rack Aware 
Assignment Strategy</span></a>.</td>
             <td><code class="docutils literal"><span 
class="pre">"none"</span></code></td>
           </tr>
-          <tr class="row-even><td>rack.aware.assignment.tags</td>
+          <tr class="row-odd"><td>rack.aware.assignment.tags</td>
             <td>Low</td>
             <td colspan="2">List of tag keys used to distribute standby 
replicas across Kafka Streams
               clients. When configured, Kafka Streams will make a best-effort 
to distribute the standby tasks over
@@ -463,72 +473,72 @@ 
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
               See <a class="reference internal" 
href="#rack-aware-assignment-tags"><span class="std std-ref">Rack Aware 
Assignment Tags</span></a>.</td>
             <td>the empty list</td>
           </tr>
-          <tr class="row-odd"><td>rack.aware.assignment.non_overlap_cost</td>
+          <tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td>
             <td>Low</td>
             <td colspan="2">Cost associated with moving tasks from existing 
assignment.
               See <a class="reference internal" 
href="#rack-aware-assignment-non-overlap-cost"><span class="std std-ref">Rack 
Aware Assignment Non-Overlap-Cost</span></a>.</td>
             <td><code class="docutils literal"><span 
class="pre">null</span></code></td>
           </tr>
-          <tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td>
+          <tr class="row-odd"><td>rack.aware.assignment.non_overlap_cost</td>
             <td>Low</td>
             <td colspan="2">Cost associated with cross rack traffic.
               See <a class="reference internal" 
href="#rack-aware-assignment-traffic-cost"><span class="std std-ref">Rack Aware 
Assignment Traffic-Cost</span></a>.</td>
             <td><code class="docutils literal"><span 
class="pre">null</span></code></td>
           </tr>
-          <tr class="row-odd"><td>replication.factor</td>
+          <tr class="row-even"><td>replication.factor</td>
             <td>Medium</td>
             <td colspan="2">The replication factor for changelog topics and 
repartition topics created by the application.
               The default of <code>-1</code> (meaning: use broker default 
replication factor) requires broker version 2.4 or newer.</td>
             <td><code class="docutils literal"><span 
class="pre">-1</span></code></td>
           </tr>
-          <tr class="row-even"><td>retry.backoff.ms</td>
+          <tr class="row-odd"><td>retry.backoff.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds, before a 
request is retried.</td>
             <td><code class="docutils literal"><span 
class="pre">100</span></code></td>
           </tr>
-          <tr class="row-odd"><td>rocksdb.config.setter</td>
+          <tr class="row-even"><td>rocksdb.config.setter</td>
             <td>Medium</td>
             <td colspan="2">The RocksDB configuration.</td>
             <td><code class="docutils literal"><span 
class="pre">null</span></code></td>
           </tr>
-          <tr class="row-even"><td>state.cleanup.delay.ms</td>
+          <tr class="row-odd"><td>state.cleanup.delay.ms</td>
             <td>Low</td>
             <td colspan="2">The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.</td>
             <td><code class="docutils literal"><span 
class="pre">600000</span></code></td> (10 minutes)</td>
           </tr>
-          <tr class="row-odd"><td>state.dir</td>
+          <tr class="row-even"><td>state.dir</td>
             <td>High</td>
             <td colspan="2">Directory location for state stores.</td>
             <td><code class="docutils literal"><span 
class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
           </tr>
-          <tr class="row-even"><td>task.assignor.class</td>
+          <tr class="row-odd"><td>task.assignor.class</td>
             <td>Medium</td>
             <td colspan="2">A task assignor class or class name implementing 
the <code>TaskAssignor</code> interface.</td>
             <td>The high-availability task assignor.</td>
           </tr>
-          <tr class="row-odd"><td>task.timeout.ms</td>
+          <tr class="row-even"><td>task.timeout.ms</td>
             <td>Medium</td>
             <td colspan="2">The maximum amount of time in milliseconds a task 
might stall due to internal errors and retries until an error is raised. For a 
timeout of <code>0 ms</code>, a task would raise an error for the first 
internal error. For any timeout larger than <code>0 ms</code>, a task will 
retry at least once before an error is raised.</td>
             <td><code class="docutils literal"><span 
class="pre">300000</span></code></td> (5 minutes)</td>
           </tr>
-          <tr class="row-even"><td>topology.optimization</td>
+          <tr class="row-odd"><td>topology.optimization</td>
             <td>Medium</td>
             <td colspan="2">A configuration telling Kafka Streams if it should 
optimize the topology and what optimizations to apply. Acceptable values are: 
<code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), 
<code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated 
list of specific optimizations: 
<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> 
(<code>reuse.ktable.source.topics</code>), 
<code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.r [...]
               <code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> 
(<code>single.store.self.join</code>). </td>
             <td><code class="docutils literal"><span 
class="pre">"NO_OPTIMIZATION"</span></code></td>
           </tr>
-          <tr class="row-odd"><td>upgrade.from</td>
+          <tr class="row-even"><td>upgrade.from</td>
             <td>Medium</td>
             <td colspan="2">The version you are upgrading from during a 
rolling upgrade.
               See <a class="reference internal" 
href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade 
From</span></a></td>
             <td><code class="docutils literal"><span 
class="pre">null</span></code></td>
           </tr>
-          <tr 
class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
+          <tr 
class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
             <td>Low</td>
             <td colspan="2">Added to a windows maintainMs to ensure data is 
not deleted from the log prematurely. Allows for clock drift.</td>
             <td><code class="docutils literal"><span 
class="pre">86400000</span></code></td> (1 day)</td>
           </tr>
-          <tr class="row-odd"><td>window.size.ms</td>
+          <tr class="row-even"><td>window.size.ms</td>
             <td>Low</td>
             <td colspan="2">Sets window size for the deserializer in order to 
calculate window end times.</td>
             <td><code class="docutils literal"><span 
class="pre">null</span></code></td>
@@ -753,6 +763,18 @@ 
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
               <p>This is discussed in more detail in <a class="reference 
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std 
std-ref">Data types and serialization</span></a>.</p>
             </div></blockquote>
         </div>
+        <div class="section" id="ensure-explicit-internal-resource-naming">
+          <h4><a class="toc-backref" 
href="#id46">ensure.explicit.internal.resource.naming</a><a class="headerlink" 
href="#ensure-explicit-internal-resource-naming" title="Permalink to this 
headline"></a></h4>
+          <blockquote>
+            <div>
+              <p>
+                Whether to enforce explicit naming for all internal resources 
of the topology, including internal
+                topics (e.g., changelog and repartition topics) and their 
associated state stores.
+                When enabled, the application will refuse to start if any 
internal resource has an auto-generated name.
+              </p>
+            </div>
+          </blockquote>
+        </div>
       <div class="section" id="rack-aware-assignment-non-overlap-cost">
         <h4><a class="toc-backref" 
href="#id37">rack.aware.assignment.non_overlap_cost</a><a class="headerlink" 
href="#rack-aware-assignment-non-overlap-cost" title="Permalink to this 
headline"></a></h4>
         <blockquote>
diff --git a/docs/streams/developer-guide/dsl-topology-naming.html 
b/docs/streams/developer-guide/dsl-topology-naming.html
index ec3bc857c10..832806050a5 100644
--- a/docs/streams/developer-guide/dsl-topology-naming.html
+++ b/docs/streams/developer-guide/dsl-topology-naming.html
@@ -300,6 +300,19 @@ stream.filter((k, v) -> v != null && v.length() >= 6)
                          <td>Stream/Table non-stateful 
operations</td><td>Named</td>
                      </tr>
                  </table>
+
+                 To further enforce best practices, Kafka Streams provides a 
configuration option,
+                 <code class="docutils literal"><span 
class="pre">ensure.explicit.internal.resource.naming</span></code>:
+                 <pre class="line-numbers"><code class="language-java">/
+            Properties props = new Properties();
+            
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+                 </code></pre>
+                 This parameter ensures that all internal topics, state 
stores, and changelog topics have explicitly defined names. When this 
configuration
+                 is enabled, a Kafka Streams application will not start if any 
of these components rely on auto-generated names. This guarantees
+                 stability across topology updates, as manually defined names 
remain unchanged even when new processors or transformations are added.
+                 Enforcing explicit naming is particularly important in 
production environments, where consistency and backward compatibility are 
essential
+                 for maintaining reliable stream processing applications.
+
        </p>
 </div>
 
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 89fd62fd9d6..257187bfa45 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -139,6 +139,16 @@
         More details about the new config 
<code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295</a>.
     </p>
 
+    <h3><a id="streams_api_changes_410" 
href="#streams_api_changes_410">Streams API changes in 4.1.0</a></h3>
+
+    <p>
+        The introduction of <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1111:+Enforcing+Explicit+Naming+for+Kafka+Streams+Internal+Topics";>KIP-1111</a>
+        enables you to enforce explicit naming for all internal resources of 
the topology, including internal topics (e.g., changelog and repartition 
topics) and their associated state stores.
+        This ensures that every internal resource is named before the Kafka 
Streams application is deployed, which is essential for upgrading your topology.
+        You can enable this feature via <code>StreamsConfig</code> using the 
<code>StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG</code> 
parameter.
+        When set to <code>true</code>, the application will refuse to start if 
any internal resource has an auto-generated name.
+    </p>
+    
     <h3><a id="streams_api_changes_400" 
href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
 
     <p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 7037e8d7fd3..e56a4cbfb4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -384,7 +384,7 @@ public class StreamsBuilder {
 
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
             new MaterializedInternal<>(
-                Materialized.with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()),
+                Materialized.<K, V, KeyValueStore<Bytes, 
byte[]>>with(consumedInternal.keySerde(), 
consumedInternal.valueSerde()).withLoggingDisabled(),
                 internalStreamsBuilder,
                 topic + "-",
                 true /* force materializing global tables */);
@@ -457,7 +457,7 @@ public class StreamsBuilder {
         Objects.requireNonNull(materialized, "materialized can't be null");
         final ConsumedInternal<K, V> consumedInternal = new 
ConsumedInternal<>(consumed);
         // always use the serdes from consumed
-        
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
+        
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled();
 
         final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> 
materializedInternal =
             new MaterializedInternal<>(materialized, internalStreamsBuilder, 
topic + "-");
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6f0e21025c3..fa9e1bd48f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -592,6 +592,12 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable 
pushing of internal client metrics for (main, restore, and global) consumers, 
producers, and admin clients." +
         " The cluster must have a client metrics subscription which 
corresponds to a client.";
 
+    /** {@code ensure.explicit.internal.resource.naming} */
+    public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG 
= "ensure.explicit.internal.resource.naming";
+    static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC = 
"Whether to enforce explicit naming for all internal resources of the topology, 
including internal" +
+        " topics (e.g., changelog and repartition topics) and their associated 
state stores." +
+        " When enabled, the application will refuse to start if any internal 
resource has an auto-generated name.";
+
     /** {@code log.summary.interval.ms} */
     public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = 
"log.summary.interval.ms";
     private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output 
interval in milliseconds for logging summary information.\n" +
@@ -869,6 +875,11 @@ public class StreamsConfig extends AbstractConfig {
                     Importance.HIGH,
                     STATE_DIR_DOC,
                     "${java.io.tmpdir}")
+            .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
+                    Type.BOOLEAN,
+                    false,
+                    Importance.HIGH,
+                    ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC)
 
             // MEDIUM
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index e96d5281d09..da8c246b26d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -55,6 +55,8 @@ import static 
org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_H
 import static 
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
 import static 
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
 import static 
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC;
 import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
 import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
@@ -142,7 +144,12 @@ public final class TopologyConfig extends AbstractConfig {
                 Type.CLASS,
                 DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
                 Importance.LOW,
-                DSL_STORE_SUPPLIERS_CLASS_DOC);
+                DSL_STORE_SUPPLIERS_CLASS_DOC)
+            .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
+                Type.BOOLEAN,
+                false,
+                Importance.HIGH,
+                ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC);
     }
     private static final Logger log = 
LoggerFactory.getLogger(TopologyConfig.class);
 
@@ -164,6 +171,8 @@ public final class TopologyConfig extends AbstractConfig {
     public final Supplier<DeserializationExceptionHandler> 
deserializationExceptionHandlerSupplier;
     public final Supplier<ProcessingExceptionHandler> 
processingExceptionHandlerSupplier;
 
+    public final boolean ensureExplicitInternalResourceNaming;
+
     public TopologyConfig(final StreamsConfig configs) {
         this(null, configs, mkObjectProperties(configs.originals()));
     }
@@ -272,6 +281,8 @@ public final class TopologyConfig extends AbstractConfig {
         } else {
             dslStoreSuppliers = 
globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
         }
+
+        ensureExplicitInternalResourceNaming = 
globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);
     }
 
     @Deprecated
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 2bd81e43d27..f41f52dac8b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -59,7 +59,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final Serde<VOut> valueSerde,
                                 final String queryableName,
                                 final boolean isOutputVersioned) {
-        processRepartitions(groupPatterns, storeFactory.storeName());
+        processRepartitions(groupPatterns, storeFactory.storeName(), 
queryableName);
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         
@@ -94,7 +94,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                                   final Serde<VOut> valueSerde,
                                                   final String queryableName,
                                                   final Windows<W> windows) {
-        processRepartitions(groupPatterns, storeFactory.storeName());
+        processRepartitions(groupPatterns, storeFactory.storeName(), 
queryableName);
 
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
@@ -135,7 +135,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final String queryableName,
                                 final SessionWindows sessionWindows,
                                 final Merger<? super K, VOut> sessionMerger) {
-        processRepartitions(groupPatterns, storeFactory.storeName());
+        processRepartitions(groupPatterns, storeFactory.storeName(), 
queryableName);
         final Collection<GraphNode> processors = new ArrayList<>();
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         int counter = 0;
@@ -175,7 +175,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                                 final Serde<VOut> valueSerde,
                                 final String queryableName,
                                 final SlidingWindows slidingWindows) {
-        processRepartitions(groupPatterns, storeFactory.storeName());
+        processRepartitions(groupPatterns, storeFactory.storeName(), 
queryableName);
         final Collection<KStreamAggProcessorSupplier> parentProcessors = new 
ArrayList<>();
         final Collection<GraphNode> processors = new ArrayList<>();
         int counter = 0;
@@ -206,7 +206,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
     }
 
     private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, 
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
-                                     final String storeName) {
+                                     final String storeName,
+                                     final String queryableName) {
         for (final KGroupedStreamImpl<K, ?> repartitionReqs : 
groupPatterns.keySet()) {
 
             if (repartitionReqs.repartitionRequired) {
@@ -216,8 +217,9 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
                 final String repartitionNamePrefix = 
repartitionReqs.userProvidedRepartitionTopicName != null ?
                     repartitionReqs.userProvidedRepartitionTopicName : 
storeName;
 
-                createRepartitionSource(repartitionNamePrefix, 
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
+                final boolean isRepartitionTopicNameProvidedByUser = 
repartitionReqs.userProvidedRepartitionTopicName != null || queryableName != 
null;
 
+                createRepartitionSource(repartitionNamePrefix, 
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde, 
isRepartitionTopicNameProvidedByUser);
                 if (!parentNodes.containsKey(repartitionReqs)) {
                     final GraphNode repartitionNode = 
repartitionNodeBuilder.build();
                     builder.addGraphNode(repartitionReqs.graphNode, 
repartitionNode);
@@ -270,14 +272,16 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
     private <VIn> void createRepartitionSource(final String 
repartitionTopicNamePrefix,
                                                final 
OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
                                                final Serde<K> keySerde,
-                                               final Serde<?> valueSerde) {
+                                               final Serde<?> valueSerde,
+                                               final boolean 
isRepartitionTopicNameProvidedByUser) {
 
         KStreamImpl.createRepartitionedSource(builder,
             keySerde,
             (Serde<VIn>) valueSerde,
             repartitionTopicNamePrefix,
             null,
-            (OptimizableRepartitionNodeBuilder<K, VIn>) 
optimizableRepartitionNodeBuilder);
+            (OptimizableRepartitionNodeBuilder<K, VIn>) 
optimizableRepartitionNodeBuilder,
+            isRepartitionTopicNameProvidedByUser);
 
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index b99034c5306..023513d0704 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -126,8 +126,10 @@ class GroupedStreamAggregateBuilder<K, V> {
 
         if (repartitionRequired) {
             final OptimizableRepartitionNodeBuilder<K, V> 
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
+
             final String repartitionTopicPrefix = 
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : 
storeName;
-            sourceName = createRepartitionSource(repartitionTopicPrefix, 
repartitionNodeBuilder);
+
+            sourceName = createRepartitionSource(repartitionTopicPrefix, 
repartitionNodeBuilder, userProvidedRepartitionTopicName != null || 
queryableStoreName != null);
 
             // First time through we need to create a repartition node.
             // Any subsequent calls to GroupedStreamAggregateBuilder#build we 
check if
@@ -157,14 +159,16 @@ class GroupedStreamAggregateBuilder<K, V> {
      * @return the new sourceName of the repartitioned source
      */
     private String createRepartitionSource(final String 
repartitionTopicNamePrefix,
-                                           final 
OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
+                                           final 
OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder,
+                                           final boolean 
isRepartitionTopicNameProvidedByUser) {
 
         return KStreamImpl.createRepartitionedSource(builder,
                                                      keySerde,
                                                      valueSerde,
                                                      
repartitionTopicNamePrefix,
                                                      null,
-                                                     
optimizableRepartitionNodeBuilder);
+                                                     
optimizableRepartitionNodeBuilder,
+                                                     
isRepartitionTopicNameProvidedByUser);
 
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c1a5bff2e05..968276bd501 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -328,6 +328,9 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
             }
         }
         internalTopologyBuilder.validateCopartition();
+
+        internalTopologyBuilder.checkUnprovidedNames();
+
     }
 
     /**
@@ -588,7 +591,8 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
                 valueSerde,
                 repartitionTopicName,
                 null,
-                repartitionNodeBuilder
+                repartitionNodeBuilder,
+                true
         );
 
         // ensures setting the repartition topic to the name of the
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8fb1327955e..1927aed03fa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -58,6 +58,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -525,7 +526,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
             valueSerde,
             name,
             repartitionedInternal.streamPartitioner(),
-            
unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties)
+            
unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties),
+            repartitionedInternal.name() != null
         );
 
         final UnoptimizableRepartitionNode<K, V> unoptimizableRepartitionNode 
= unoptimizableRepartitionNodeBuilder.build();
@@ -633,7 +635,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
                 valueSerdeOverride,
                 name,
                 null,
-                repartitionNodeBuilder
+                repartitionNodeBuilder,
+                namedInternal.name() != null
             );
 
             tableParentNode = repartitionNodeBuilder.build();
@@ -895,21 +898,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         if (joinThis.repartitionRequired) {
             final String joinThisName = joinThis.name;
             final String leftJoinRepartitionTopicName = 
name.suffixWithOrElseGet("-left", joinThisName);
+
             joinThis = joinThis.repartitionForJoin(
                 leftJoinRepartitionTopicName,
                 streamJoinedInternal.keySerde(),
-                streamJoinedInternal.valueSerde()
-            );
+                streamJoinedInternal.valueSerde(),
+                name.name() != null);
         }
 
         if (joinOther.repartitionRequired) {
             final String joinOtherName = joinOther.name;
             final String rightJoinRepartitionTopicName = 
name.suffixWithOrElseGet("-right", joinOtherName);
+
             joinOther = joinOther.repartitionForJoin(
                 rightJoinRepartitionTopicName,
                 streamJoinedInternal.keySerde(),
-                streamJoinedInternal.otherValueSerde()
-            );
+                streamJoinedInternal.otherValueSerde(),
+                name.name() != null);
         }
 
         joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
@@ -928,7 +933,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
      */
     private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
                                                  final Serde<K> 
keySerdeOverride,
-                                                 final Serde<V> 
valueSerdeOverride) {
+                                                 final Serde<V> 
valueSerdeOverride,
+                                                 final boolean 
isRepartitionTopicNameProvidedByUser) {
         final Serde<K> repartitionKeySerde = keySerdeOverride != null ? 
keySerdeOverride : keySerde;
         final Serde<V> repartitionValueSerde = valueSerdeOverride != null ? 
valueSerdeOverride : valueSerde;
         final OptimizableRepartitionNodeBuilder<K, V> 
optimizableRepartitionNodeBuilder =
@@ -942,7 +948,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
             repartitionValueSerde,
             repartitionName,
             null,
-            optimizableRepartitionNodeBuilder);
+            optimizableRepartitionNodeBuilder,
+            isRepartitionTopicNameProvidedByUser);
 
         if (repartitionNode == null || !name.equals(repartitionName)) {
             repartitionNode = optimizableRepartitionNodeBuilder.build();
@@ -965,11 +972,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
         final Serde<Value> valueSerde,
         final String repartitionTopicNamePrefix,
         final StreamPartitioner<Key, Value> streamPartitioner,
-        final BaseRepartitionNodeBuilder<Key, Value, RepartitionNode> 
baseRepartitionNodeBuilder
-    ) {
+        final BaseRepartitionNodeBuilder<Key, Value, RepartitionNode> 
baseRepartitionNodeBuilder,
+        final boolean isRepartitionTopicNameProvidedByUser) {
+
         final String repartitionTopicName = 
repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ?
             repartitionTopicNamePrefix :
             repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
+        if (!isRepartitionTopicNameProvidedByUser) {
+            
builder.internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withRepartitionTopic(repartitionTopicName).build());
+        }
 
         // Always need to generate the names to burn index counter for 
compatibility
         final String genSinkName = builder.newProcessorName(SINK_NAME);
@@ -1051,7 +1062,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(
                     name != null ? name : this.name,
                     joinedInternal.keySerde(),
-                    joinedInternal.leftValueSerde()
+                    joinedInternal.leftValueSerde(),
+                    name != null
             );
             return thisStreamRepartitioned.doStreamTableJoin(table, joiner, 
joinedInternal, false);
         } else {
@@ -1091,7 +1103,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(
                     name != null ? name : this.name,
                     joinedInternal.keySerde(),
-                    joinedInternal.leftValueSerde()
+                    joinedInternal.leftValueSerde(),
+                    name != null
             );
             return thisStreamRepartitioned.doStreamTableJoin(table, joiner, 
joinedInternal, true);
         } else {
@@ -1124,6 +1137,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
                 joinedInternal.gracePeriod(),
                 name)
             );
+
+            if (joinedInternal.name() == null) {
+                final InternalResourcesNaming internalResourcesNaming = 
InternalResourcesNaming.builder().withStateStore(bufferName).withChangelogTopic(bufferName
 + "-changelog").build();
+                
internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming);
+            }
         }
 
         final ProcessorSupplier<K, V, K, VOut> processorSupplier = new 
KStreamKTableJoin<>(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index aeece23cf34..902079863fd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -30,6 +30,7 @@ import 
org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
 import 
org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
 import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.Stores;
@@ -168,6 +169,14 @@ class KStreamImplJoin {
             );
         }
 
+        if (userProvidedBaseStoreName == null) {
+            addInternalResourceName(thisWindowStore);
+            addInternalResourceName(otherWindowStore);
+            if (outerJoinWindowStore.isPresent()) {
+                addInternalResourceName(outerJoinWindowStore.get());
+            }
+        }
+
         // Time-shared between joins to keep track of the maximum stream time
         final TimeTrackerSupplier sharedTimeTrackerSupplier = new 
TimeTrackerSupplier();
 
@@ -261,4 +270,12 @@ class KStreamImplJoin {
             valueSerde
         ));
     }
+
+    private void addInternalResourceName(final StoreFactory windowStore) {
+        final InternalResourcesNaming.Builder thisInternalResourcesNaming = 
InternalResourcesNaming.builder().withStateStore(windowStore.storeName());
+        if (windowStore.loggingEnabled()) {
+            
thisInternalResourcesNaming.withChangelogTopic(windowStore.storeName() + 
"-changelog");
+        }
+        
builder.internalTopologyBuilder().addImplicitInternalNames(thisInternalResourcesNaming.build());
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 7f37a27149e..e9e9035981d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -66,6 +66,7 @@ import 
org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
 import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 import 
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
@@ -549,8 +550,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
 
         final SuppressedInternal<K> suppressedInternal = 
buildSuppress(suppressed, name);
 
-        final String storeName =
-            suppressedInternal.name() != null ? suppressedInternal.name() + 
"-store" : builder.newStoreName(SUPPRESS_NAME);
+        final String storeName;
+        if (suppressedInternal.name() != null) {
+            storeName = suppressedInternal.name() + "-store";
+        } else {
+            storeName = builder.newStoreName(SUPPRESS_NAME);
+            if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
+                
internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withChangelogTopic(storeName
 + "-changelog").build());
+            }
+        }
 
         final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, 
Change<V>>> storeBuilder;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index d6cd130ba6d..7bd727b09be 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
 import org.apache.kafka.streams.state.DslStoreSuppliers;
 import org.apache.kafka.streams.state.StoreSupplier;
 
@@ -53,6 +54,13 @@ public final class MaterializedInternal<K, V, S extends 
StateStore> extends Mate
         queryable = forceQueryable || storeName() != null;
         if (storeName() == null && nameProvider != null) {
             storeName = nameProvider.newStoreName(generatedStorePrefix);
+            if (nameProvider instanceof InternalStreamsBuilder) {
+                final InternalResourcesNaming.Builder internalResourcesNaming 
= InternalResourcesNaming.builder().withStateStore(storeName);
+                if (loggingEnabled()) {
+                    internalResourcesNaming.withChangelogTopic(storeName + 
"-changelog");
+                }
+                ((InternalStreamsBuilder) 
nameProvider).internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming.build());
+            }
         }
 
         // if store type is not configured during creating Materialized, then 
try to get the topologyConfigs from nameProvider
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java
new file mode 100644
index 00000000000..f4b46088f3a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+public final class InternalResourcesNaming {
+
+    private final String repartitionTopic;
+    private final String changelogTopic;
+    private final String stateStore;
+
+    private InternalResourcesNaming(final Builder builder) {
+        this.repartitionTopic = builder.repartitionTopic;
+        this.changelogTopic = builder.changelogTopic;
+        this.stateStore = builder.stateStore;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static final class Builder {
+        private String repartitionTopic;
+        private String changelogTopic;
+        private String stateStore;
+
+        private Builder() {}
+
+        public Builder withRepartitionTopic(final String repartitionTopic) {
+            this.repartitionTopic = repartitionTopic;
+            return this;
+        }
+
+        public Builder withChangelogTopic(final String changelogTopic) {
+            this.changelogTopic = changelogTopic;
+            return this;
+        }
+
+        public Builder withStateStore(final String stateStore) {
+            this.stateStore = stateStore;
+            return this;
+        }
+
+        public InternalResourcesNaming build() {
+            return new InternalResourcesNaming(this);
+        }
+    }
+
+    public String repartitionTopic() {
+        return repartitionTopic;
+    }
+
+    public String changelogTopic() {
+        return changelogTopic;
+    }
+
+    public String stateStore() {
+        return stateStore;
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 23f4f343fad..2503a358419 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -53,6 +54,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -64,12 +66,14 @@ import java.util.TreeSet;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
 import static 
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
 
 public class InternalTopologyBuilder {
 
     public InternalTopologyBuilder() {
         this.topologyName = null;
+        this.ensureExplicitInternalResourceNaming = false;
         this.processorWrapper = new NoOpProcessorWrapper();
     }
 
@@ -78,7 +82,7 @@ public class InternalTopologyBuilder {
 
         this.topologyConfigs = topologyConfigs;
         this.topologyName = topologyConfigs.topologyName;
-
+        this.ensureExplicitInternalResourceNaming = 
topologyConfigs.ensureExplicitInternalResourceNaming;
         try {
             processorWrapper = topologyConfigs.getConfiguredInstance(
                 PROCESSOR_WRAPPER_CLASS_CONFIG,
@@ -194,6 +198,10 @@ public class InternalTopologyBuilder {
 
     private boolean hasPersistentStores = false;
 
+    private final boolean ensureExplicitInternalResourceNaming;
+
+    private final Set<InternalResourcesNaming> implicitInternalNames = new 
LinkedHashSet<>();
+
     public static class ReprocessFactory<KIn, VIn, KOut, VOut> {
 
         private final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier;
@@ -2293,4 +2301,46 @@ public class InternalTopologyBuilder {
             processorWrapper.wrapProcessorSupplier(name, processorSupplier)
         );
     }
+
+    public void addImplicitInternalNames(final InternalResourcesNaming 
internalResourcesNaming) {
+        implicitInternalNames.add(internalResourcesNaming);
+    }
+
+    public void checkUnprovidedNames() {
+        if (!implicitInternalNames.isEmpty()) {
+            final StringBuilder result = new StringBuilder();
+            final List<String> changelogTopics = new ArrayList<>();
+            final List<String> stateStores = new ArrayList<>();
+            final List<String> repartitionTopics = new ArrayList<>();
+            for (final InternalResourcesNaming internalResourcesNaming : 
implicitInternalNames) {
+                if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) {
+                    
changelogTopics.add(internalResourcesNaming.changelogTopic());
+                }
+                if (!Utils.isBlank(internalResourcesNaming.stateStore())) {
+                    stateStores.add(internalResourcesNaming.stateStore());
+                }
+                if 
(!Utils.isBlank(internalResourcesNaming.repartitionTopic())) {
+                    
repartitionTopics.add(internalResourcesNaming.repartitionTopic());
+                }
+            }
+            if (!changelogTopics.isEmpty()) {
+                result.append(String.format("Following changelog topic(s) has 
not been named: %s%n", String.join(", ", changelogTopics)));
+            }
+            if (!stateStores.isEmpty()) {
+                result.append(String.format("Following state store(s) has not 
been named: %s%n", String.join(", ", stateStores)));
+            }
+            if (!repartitionTopics.isEmpty()) {
+                result.append(String.format("Following repartition topic(s) 
has not been named: %s%n", String.join(", ", repartitionTopics)));
+            }
+            if (ensureExplicitInternalResourceNaming) {
+                throw new TopologyException(result.toString());
+            } else {
+                log.warn("Explicit naming for internal resources is currently 
disabled. If you want to enforce" +
+                    " user-defined names for all internal resources, set " + 
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG +
+                    " to true. Note: Changing internal resource names may 
require a full streams application reset for an" +
+                    " already deployed application. Consult the documentation 
on naming operators for more details. {}", result);
+            }
+        }
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 08e413703c1..a6c79806319 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -22,9 +22,11 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Topology.AutoOffsetReset;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Branched;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
@@ -35,12 +37,14 @@ import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.Printed;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Repartitioned;
 import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.SlidingWindows;
 import org.apache.kafka.streams.kstream.StreamJoined;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TableJoined;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -54,6 +58,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.InMemorySessionStore;
@@ -90,6 +95,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
 import static 
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
 import static 
org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
@@ -103,6 +109,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -2354,6 +2361,552 @@ public class StreamsBuilderTest {
         assertThrows(TopologyException.class, builder::build);
     }
 
+    @Test
+    void 
shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingEnabled() {
+        final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+            Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+            Materialized.with(Serdes.String(), Serdes.Long())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void 
shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingDisabled() {
+        final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+                Grouped.with("repartition-name", Serdes.String(), 
Serdes.String()),
+                Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>with(Serdes.String(), Serdes.Long())
+                    .withLoggingDisabled()
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void shouldNotThrowWhenGroupByAggregationWithMaterializedName() {
+        final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+            Grouped.with(Serdes.String(), Serdes.String()),
+            Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void 
shouldNotThrowWhenGroupByAggregationWithRepartitionNameAndMaterialized() {
+        final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+            Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+            Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void 
shouldThrowWhenGroupByAggregationWithoutRepartitionNameAndMaterializedName() {
+        final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+            Grouped.with(Serdes.String(), Serdes.String()),
+            Materialized.with(Serdes.String(), Serdes.Long())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
+    }
+
+    private StreamsBuilder buildWithGroupByAggregationTopology(final 
Grouped<String, String> grouped,
+                                                               final 
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final KStream<String, String> stream = builder.stream("input1");
+        stream
+            .groupBy((k, v) -> v, grouped)
+            .count(materialized)
+            .toStream()
+            .to("output", Produced.as("sink"));
+        return builder;
+    }
+
+    @Test
+    void 
shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingEnabled() {
+        final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+            Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+            Materialized.with(Serdes.String(), Serdes.Long())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void 
shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingDisabled() {
+        final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+                Grouped.with("repartition-name", Serdes.String(), 
Serdes.String()),
+                Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>with(Serdes.String(), Serdes.Long())
+                    .withLoggingDisabled()
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void shouldNotThrowWhenGroupByKeyAggregationWithMaterializedName() {
+        final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+             Grouped.with(Serdes.String(), Serdes.String()),
+            Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void 
shouldNotThrowWhenGroupByKeyAggregationWithRepartitionNameAndMaterializedName() 
{
+        final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+            Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+            Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void 
shouldThrowWhenGroupByKeyAggregationWithoutRepartitionNameAndMaterializedName() 
{
+        final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+             Grouped.with(Serdes.String(), Serdes.String()),
+            Materialized.with(Serdes.String(), Serdes.Long())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
+    }
+
+    private StreamsBuilder buildWithGroupByKeyAggregationTopology(final 
Grouped<String, String> grouped,
+                                                                  final 
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final KStream<String, String> stream = builder.stream("input1");
+        stream
+            .selectKey((k, v) -> v)
+            .groupByKey(grouped)
+            .count(materialized)
+            .toStream()
+            .to("output", Produced.as("sink"));
+        return builder;
+    }
+
+    @Test
+    void shouldNotThrowWhenSuppressWithSuppressName() {
+        final StreamsBuilder builder = 
buildAggregationWithSuppressTopology(true, true);
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void shouldThrowWhenSuppressWithoutSuppressName() {
+        final StreamsBuilder builder = 
buildAggregationWithSuppressTopology(false, true);
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KTABLE-SUPPRESS-STATE-STORE-0000000003-changelog"));
+        assertFalse(e.getMessage().contains("Following state store(s) has not 
been named"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void shouldThrowWhenSuppressWithoutSuppressNameAndLoggingDisabled() {
+        final StreamsBuilder builder = 
buildAggregationWithSuppressTopology(false, false);
+        assertBuildDoesNotThrow(builder);
+    }
+
+    private StreamsBuilder buildAggregationWithSuppressTopology(final boolean 
isSuppressNamed,
+                                                                final boolean 
isLoggingEnabled) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream = builder.stream("input1");
+        final KTable<Windowed<String>, Long> table = stream
+            .groupByKey()
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
+            .count(Materialized.as("materialized-name"));
+        if (isSuppressNamed) {
+            
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
+                    .withName("suppressed-name"))
+                .toStream()
+                .to("output", Produced.as("sink"));
+        } else {
+            if (isLoggingEnabled) {
+                
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
+                    .toStream()
+                    .to("output", Produced.as("sink"));
+            } else {
+                
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()))
+                    .toStream()
+                    .to("output", Produced.as("sink"));
+            }
+        }
+        return builder;
+    }
+
+    @Test
+    void 
shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingEnabled() {
+        final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+            StreamJoined.with(Serdes.String(), Serdes.String(), 
Serdes.String())
+                .withName("repartition-name")
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-JOINTHIS-0000000012-store-changelog, 
KSTREAM-OUTEROTHER-0000000013-store-changelog, 
KSTREAM-OUTERSHARED-0000000012-store-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-JOINTHIS-0000000012-store, 
KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void 
shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingDisabled() {
+        final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+                StreamJoined.with(Serdes.String(), Serdes.String(), 
Serdes.String())
+                    .withName("repartition-name").withLoggingDisabled()
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-JOINTHIS-0000000012-store, 
KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void shouldThrowWhenKStreamKStreamJoinWithMaterializedName() {
+        final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+            StreamJoined.with(Serdes.String(), Serdes.String(), 
Serdes.String())
+                .withStoreName("store-name")
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertFalse(e.getMessage().contains("Following changelog topic(s) has 
not been named"));
+        assertFalse(e.getMessage().contains("Following state store(s) has not 
been named"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-KEY-SELECT-0000000002-repartition, 
KSTREAM-KEY-SELECT-0000000003-repartition"));
+    }
+
+    @Test
+    void 
shouldNotThrowWhenKStreamKStreamJoinWithRepartitionNameAndMaterializedName() {
+        final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+            StreamJoined.with(Serdes.String(), Serdes.String(), 
Serdes.String())
+                .withName("repartition-name")
+                .withStoreName("store-name")
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void 
shouldThrowWhenKStreamKStreamJoinWithoutRepartitionNameAndMaterializedName() {
+        final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+            StreamJoined.with(Serdes.String(), Serdes.String(), 
Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-JOINTHIS-0000000012-store-changelog, 
KSTREAM-OUTEROTHER-0000000013-store-changelog, 
KSTREAM-OUTERSHARED-0000000012-store-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: KSTREAM-JOINTHIS-0000000012-store, 
KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-KEY-SELECT-0000000002-repartition, 
KSTREAM-KEY-SELECT-0000000003-repartition"));
+    }
+
+    private StreamsBuilder buildKStreamKStreamJoinTopology(final 
StreamJoined<String, String, String> streamJoined) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+        final KStream<String, String> streamTwo = 
builder.stream(STREAM_TOPIC_TWO);
+        streamOne
+            .selectKey((k, v) -> v)
+            .leftJoin(
+                streamTwo.selectKey((k, v) -> v),
+                (value1, value2) -> value1,
+                JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+                streamJoined
+            );
+        return builder;
+    }
+
+    @Test
+    void 
shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingEnabled() {
+        final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+            Joined.with(Serdes.String(), Serdes.String(), 
Serdes.String()).withName("repartition-name"),
+            Materialized.with(Serdes.String(), Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: stream-topic-two-STATE-STORE-0000000001-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: stream-topic-two-STATE-STORE-0000000001"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void 
shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingDisabled() {
+        final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+                Joined.with(Serdes.String(), Serdes.String(), 
Serdes.String()).withName("repartition-name"),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(Serdes.String(), Serdes.String())
+                    .withLoggingDisabled()
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: stream-topic-two-STATE-STORE-0000000001"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void shouldThrowWhenKStreamKTableJoinWithMaterializedName() {
+        final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+            Joined.with(Serdes.String(), Serdes.String(), Serdes.String()),
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertFalse(e.getMessage().contains("Following changelog topic(s) has 
not been named"));
+        assertFalse(e.getMessage().contains("Following state store(s) has not 
been named"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-KEY-SELECT-0000000003-repartition"));
+    }
+
+    @Test
+    void 
shouldNotThrowWhenKStreamKTableJoinWithRepartitionNameAndMaterializedName() {
+        final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+            Joined.with(Serdes.String(), Serdes.String(), 
Serdes.String()).withName("repartition-name"),
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                    
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void 
shouldThrowWhenKStreamKTableJoinWithoutRepartitionNameAndMaterializedName() {
+        final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+            Joined.with(Serdes.String(), Serdes.String(), Serdes.String()),
+            Materialized.with(Serdes.String(), Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: stream-topic-two-STATE-STORE-0000000001-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: stream-topic-two-STATE-STORE-0000000001"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-KEY-SELECT-0000000004-repartition"));
+    }
+
+    private StreamsBuilder buildKStreamKTableJoinTopology(final Joined<String, 
String, String> joined, 
+                                                          final 
Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
+        final KTable<String, String> table = builder.table(STREAM_TOPIC_TWO, 
materialized);
+        stream
+            .selectKey((k, v) -> v)
+            .join(
+                table,
+                (value1, value2) -> value1,
+                joined
+            );
+        return builder;
+    }
+
+
+    @Test
+    void shouldNotThrowWhenKStreamVersionedKTableJoinWithRepartitionName() {
+        final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology(
+            Joined.with(Serdes.String(), Serdes.String(), 
Serdes.String()).withName("repartition-name")
+
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void shouldThrowWhenKStreamVersionedKTableJoinWithoutRepartitionName() {
+        final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology(
+            Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-JOIN-0000000007-Buffer-changelog"));
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: KSTREAM-JOIN-0000000007-Buffer"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-KEY-SELECT-0000000003-repartition"));
+    }
+
+    private StreamsBuilder buildKStreamVersionedKTableJoinTopology(final 
Joined<String, String, String> joined) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final VersionedBytesStoreSupplier versionedStoreSupplier =
+                
Stores.persistentVersionedKeyValueStore("versioned-ktable-store",
+                        Duration.ofDays(1));
+        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> 
materialized =
+            Materialized.<String, String>as(versionedStoreSupplier)
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.String());
+        final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
+        final KTable<String, String> table = builder.table(STREAM_TOPIC_TWO, 
materialized);
+        stream
+            .selectKey((k, v) -> v)
+            .join(
+                table,
+                (value1, value2) -> value1,
+                joined.withGracePeriod(Duration.ofHours(1))
+            )
+                .to("test-topic");
+        return builder;
+    }
+
+    @Test
+    void shouldNotThrowWhenKStreamGlobalKTableJoinWithMaterializedName() {
+        final StreamsBuilder builder = buildKStreamGlobalKTableJoinTopology(
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void shouldThrowWhenKStreamGlobalKTableJoinWithoutStoreName() {
+        final StreamsBuilder builder = 
buildKStreamGlobalKTableJoinTopology(null);
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertFalse(e.getMessage().contains("Following changelog topic(s) has 
not been named"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: stream-topic-two-STATE-STORE-0000000001"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+
+    }
+
+    private StreamsBuilder buildKStreamGlobalKTableJoinTopology(final 
Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+        final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
+        final GlobalKTable<String, String> globalTable;
+        if (materialized != null) {
+            globalTable = builder.globalTable(STREAM_TOPIC_TWO, materialized);
+        } else {
+            globalTable = builder.globalTable(STREAM_TOPIC_TWO);
+        }
+        stream
+            .selectKey((k, v) -> v)
+            .join(
+                globalTable,
+                (k, v) -> k,
+                (value1, value2) -> value1
+            );
+        return builder;
+    }
+
+    @Test
+    void shouldNotThrowWhenRepartitionWithRepartitionName() {
+        final StreamsBuilder builder = buildRepartitionTopology(
+            Repartitioned.with(Serdes.String(), Serdes.String())
+                .withName("repartition-name")
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void shouldThrowWhenRepartitionWithoutRepartition() {
+        final StreamsBuilder builder = buildRepartitionTopology(
+            Repartitioned.with(Serdes.String(), Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertFalse(e.getMessage().contains("Following changelog topic(s) has 
not been named"));
+        assertFalse(e.getMessage().contains("Following state store(s) has not 
been named"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: KSTREAM-REPARTITION-0000000001-repartition"));
+    }
+
+    private StreamsBuilder buildRepartitionTopology(final 
Repartitioned<String, String> repartitioned) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> stream = builder.stream("input1");
+        stream
+            .repartition(repartitioned)
+            .to("output", Produced.as("sink"));
+        return builder;
+    }
+
+    @Test
+    void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingEnabled() {
+        final StreamsBuilder builder = buildCoGroupTopology(
+            Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+            Materialized.with(Serdes.String(), Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingDisabled() {
+        final StreamsBuilder builder = buildCoGroupTopology(
+                Grouped.with("repartition-name", Serdes.String(), 
Serdes.String()),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(Serdes.String(), Serdes.String())
+                    .withLoggingDisabled()
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertFalse(e.getMessage().contains("Following repartition topic(s) 
has not been named"));
+    }
+
+    @Test
+    void shouldNotThrowWhenCoGroupWithMaterializedName() {
+        final StreamsBuilder builder = buildCoGroupTopology(
+            Grouped.with(Serdes.String(), Serdes.String()),
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void shouldNotThrowWhenCoGroupWithRepartitionNameAndMaterializedName() {
+        final StreamsBuilder builder = buildCoGroupTopology(
+            Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("materialized-name")
+                .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+        );
+        assertBuildDoesNotThrow(builder);
+    }
+
+    @Test
+    void shouldThrowWhenCoGroupWithoutRepartitionNameAndMaterializedName() {
+        final StreamsBuilder builder = buildCoGroupTopology(
+            Grouped.with(Serdes.String(), Serdes.String()),
+            Materialized.with(Serdes.String(), Serdes.String())
+        );
+        final TopologyException e = assertThrows(TopologyException.class, 
builder::build);
+        assertTrue(e.getMessage().contains("Following changelog topic(s) has 
not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+        assertTrue(e.getMessage().contains("Following state store(s) has not 
been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+        assertTrue(e.getMessage().contains("Following repartition topic(s) has 
not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
+    }
+
+    private StreamsBuilder buildCoGroupTopology(final Grouped<String, String> 
grouped,
+                                                final Materialized<String, 
String, KeyValueStore<Bytes, byte[]>> materialized) {
+
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+        final KStream<String, String> streamTwo = 
builder.stream(STREAM_TOPIC_TWO);
+
+        final KGroupedStream<String, String> groupedOne = 
streamOne.groupBy((k, v) -> v, grouped);
+        final KGroupedStream<String, String> groupedTwo = 
streamTwo.groupByKey();
+
+        final Aggregator<String, String, String> agg1 = (key, value, 
aggregate) -> aggregate + value;
+        final Aggregator<String, String, String> agg2 = (key, value, 
aggregate) -> aggregate + value;
+
+        final KTable<String, String> coGroupedStream = groupedOne
+            .cogroup(agg1)
+            .cogroup(groupedTwo, agg2)
+            .aggregate(() -> "", materialized);
+
+        coGroupedStream.toStream().to("output");
+
+        return builder;
+    }
+
     private static void assertBuildDoesNotThrow(final StreamsBuilder builder) {
         try {
             builder.build();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ce99358ad6d..7e9447851e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -69,6 +69,7 @@ import static 
org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
 import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
 import static 
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
 import static 
org.apache.kafka.streams.StreamsConfig.ENABLE_METRICS_PUSH_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
 import static 
org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
 import static 
org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
@@ -129,6 +130,7 @@ public class StreamsConfigTest {
                     case "DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC":
                     case "DSL_STORE_SUPPLIERS_CLASS_DOC":
                     case "PROCESSOR_WRAPPER_CLASS_DOC":
+                    case "ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC":
                         continue;
 
                     // check for leaking, but already deprecated members
@@ -1582,6 +1584,18 @@ public class StreamsConfigTest {
         assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class, 
streamsConfig.productionExceptionHandler().getClass());
     }
 
+    @Test
+    public void shouldGetDefaultEnsureExplicitInternalResourceNaming() {
+        
assertFalse(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG));
+    }
+
+    @Test
+    public void shouldEnsureExplicitInternalResourceNaming() {
+        props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+        streamsConfig = new StreamsConfig(props);
+        
assertTrue(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG));
+    }
+
     static class MisconfiguredSerde implements Serde<Object> {
         @Override
         public void configure(final Map<String, ?>  configs, final boolean 
isKey) {


Reply via email to