This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ce5f23295c KAFKA-18023: Enforcing Explicit Naming for Kafka Streams
Internal Topics (#18233)
3ce5f23295c is described below
commit 3ce5f23295c6d6877285fb9d45717b290554d25a
Author: Sebastien Viale <[email protected]>
AuthorDate: Mon Feb 24 11:41:42 2025 +0100
KAFKA-18023: Enforcing Explicit Naming for Kafka Streams Internal Topics
(#18233)
Pull request to implement KIP-1111, aims to add a configuration that
prevents a Kafka Streams application from starting if any of its
internal topics have auto-generated names, thereby enforcing explicit
naming for all internal topics and enhancing the stability of the
application’s topology.
- Repartition Topics:
All repartition topics are created in the
KStreamImpl.createRepartitionedSource(...) static method. This method
either receives a name explicitly provided by the user or null and then
builds the final repartition topic name.
- Changelog Topics and State Store Names:
There are several scenarios where these are created:
In the MaterializedInternal constructor.
During KStream/KStream joins.
During KStream/KTable joins with grace periods.
With key-value buffers are used in suppressions.
Reviewers: Lucas Brutschy <[email protected]>, Sophie Blee-Goldman
<[email protected]>
---
docs/streams/developer-guide/config-streams.html | 84 ++--
.../developer-guide/dsl-topology-naming.html | 13 +
docs/streams/upgrade-guide.html | 10 +
.../org/apache/kafka/streams/StreamsBuilder.java | 4 +-
.../org/apache/kafka/streams/StreamsConfig.java | 11 +
.../org/apache/kafka/streams/TopologyConfig.java | 13 +-
.../internals/CogroupedStreamAggregateBuilder.java | 20 +-
.../internals/GroupedStreamAggregateBuilder.java | 10 +-
.../kstream/internals/InternalStreamsBuilder.java | 6 +-
.../streams/kstream/internals/KStreamImpl.java | 42 +-
.../streams/kstream/internals/KStreamImplJoin.java | 17 +
.../streams/kstream/internals/KTableImpl.java | 12 +-
.../kstream/internals/MaterializedInternal.java | 8 +
.../internals/InternalResourcesNaming.java | 73 +++
.../internals/InternalTopologyBuilder.java | 52 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 553 +++++++++++++++++++++
.../apache/kafka/streams/StreamsConfigTest.java | 14 +
17 files changed, 881 insertions(+), 61 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index a7d82b97ccd..7f9aaa60616 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -79,6 +79,7 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#default-value-serde"
id="id9">default.value.serde</a></li>
<li><a class="reference internal"
href="#deserialization-exception-handler"
id="id7">deserialization.exception.handler</a></li>
<li><a class="reference internal" href="#enable-metrics-push"
id="id43">enable.metrics.push</a></li>
+ <li><a class="reference internal"
href="#ensure-explicit-internal-resource-naming"
id="id46">ensure.explicit.internal.resource.naming</a></li>
<li><a class="reference internal"
href="#log-summary-interval-ms" id="id40">log.summary.interval.ms</a></li>
<li><a class="reference internal" href="#max-task-idle-ms"
id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas"
id="id29">max.warmup.replicas</a></li>
@@ -348,17 +349,26 @@
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td>
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
</tr>
- <tr class="row-odd"><td>log.summary.interval.ms</td>
+ <tr class="row-odd"><td>ensure.explicit.internal.resource.naming</td>
+ <td>High</td>
+ <td colspan="2">
+ Whether to enforce explicit naming for all internal resources of
the topology, including internal
+ topics (e.g., changelog and repartition topics) and their
associated state stores.
+ When enabled, the application will refuse to start if any
internal resource has an auto-generated name.
+ </td>
+ <td><code class="docutils literal"><span
class="pre">false</span></code></td>
+ </tr>
+ <tr class="row-even"><td>log.summary.interval.ms</td>
<td>Low</td>
<td colspan="2">The output interval in milliseconds for logging
summary information (disabled if negative).</td>
<td><code class="docutils literal"><span
class="pre">120000</span></code> (2 minutes)</td>
</tr>
- <tr class="row-even"><td>enable.metrics.push</td>
+ <tr class="row-odd"><td>enable.metrics.push</td>
<td>Low</td>
<td colspan="2">Whether to enable pushing of client metrics to the
cluster, if the cluster has a client metrics subscription which matches this
client.</td>
<td><code class="docutils literal"><span
class="pre">true</span></code></td>
</tr>
- <tr class="row-odd"><td>max.task.idle.ms</td>
+ <tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td>
<td colspan="2">
<p>
@@ -377,76 +387,76 @@
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td>
<td><code class="docutils literal"><span
class="pre">0</span></code></td>
</tr>
- <tr class="row-even"><td>max.warmup.replicas</td>
+ <tr class="row-odd"><td>max.warmup.replicas</td>
<td>Medium</td>
<td colspan="2">The maximum number of warmup replicas (extra
standbys beyond the configured num.standbys) that can be assigned at once.</td>
<td><code class="docutils literal"><span
class="pre">2</span></code></td>
</tr>
- <tr class="row-odd"><td>metric.reporters</td>
+ <tr class="row-even"><td>metric.reporters</td>
<td>Low</td>
<td colspan="2">A list of classes to use as metrics reporters.</td>
<td>the empty list</td>
</tr>
- <tr class="row-even"><td>metrics.num.samples</td>
+ <tr class="row-odd"><td>metrics.num.samples</td>
<td>Low</td>
<td colspan="2">The number of samples maintained to compute
metrics.</td>
<td><code class="docutils literal"><span
class="pre">2</span></code></td>
</tr>
- <tr class="row-odd"><td>metrics.recording.level</td>
+ <tr class="row-even"><td>metrics.recording.level</td>
<td>Low</td>
<td colspan="2">The highest recording level for metrics.</td>
<td><code class="docutils literal"><span
class="pre">INFO</span></code></td>
</tr>
- <tr class="row-even"><td>metrics.sample.window.ms</td>
+ <tr class="row-odd"><td>metrics.sample.window.ms</td>
<td>Low</td>
<td colspan="2">The window of time in milliseconds a metrics
sample is computed over.</td>
<td><code class="docutils literal"><span
class="pre">30000</span></code> (30 seconds)</td>
</tr>
- <tr class="row-odd"><td>num.standby.replicas</td>
+ <tr class="row-even"><td>num.standby.replicas</td>
<td>High</td>
<td colspan="2">The number of standby replicas for each task.</td>
<td><code class="docutils literal"><span
class="pre">0</span></code></td>
</tr>
- <tr class="row-even"><td>num.stream.threads</td>
+ <tr class="row-odd"><td>num.stream.threads</td>
<td>Medium</td>
<td colspan="2">The number of threads to execute stream
processing.</td>
<td><code class="docutils literal"><span
class="pre">1</span></code></td>
</tr>
- <tr class="row-odd"><td>probing.rebalance.interval.ms</td>
+ <tr class="row-even"><td>probing.rebalance.interval.ms</td>
<td>Low</td>
<td colspan="2">The maximum time in milliseconds to wait before
triggering a rebalance to probe for warmup replicas that have sufficiently
caught up.</td>
<td><code class="docutils literal"><span
class="pre">600000</span></code> (10 minutes)</td>
</tr>
- <tr class="row-even"><td>processing.exception.handler</td>
+ <tr class="row-odd"><td>processing.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code
class="docutils literal"><span
class="pre">ProcessingExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span
class="pre">LogAndFailProcessingExceptionHandler</span></code></td>
</tr>
- <tr class="row-odd"><td>processing.guarantee</td>
+ <tr class="row-even"><td>processing.guarantee</td>
<td>Medium</td>
<td colspan="2">The processing mode. Can be either <code
class="docutils literal"><span class="pre">"at_least_once"</span></code>
or <code class="docutils literal"><span
class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker
version 2.5+).
See <a class="reference internal"
href="#streams-developer-guide-processing-guarantee"><span class="std
std-ref">Processing Guarantee</span></a>.</td>.
<td><code class="docutils literal"><span
class="pre">"at_least_once"</span></code></td>
</tr>
- <tr class="row-even"><td>processor.wrapper.class</td>
+ <tr class="row-odd"><td>processor.wrapper.class</td>
<td>Medium</td>
<td colspan="2">A class or class name implementing the <code
class="docutils literal"><span class="pre">ProcessorWrapper</span></code>
interface.
Must be passed in when creating the topology, and will not be
applied unless passed in to the appropriate constructor as a TopologyConfig.
You should
use the <code class="docutils literal"><span
class="pre">StreamsBuilder#new(TopologyConfig)</span></code> constructor for
DSL applications, and the
<code class="docutils literal"><span
class="pre">Topology#new(TopologyConfig)</span></code> constructor for PAPI
applications.</td>
</tr>
- <tr class="row-odd"><td>production.exception.handler</td>
+ <tr class="row-even"><td>production.exception.handler</td>
<td>Medium</td>
<td colspan="2">Exception handling class that implements the <code
class="docutils literal"><span
class="pre">ProductionExceptionHandler</span></code> interface.</td>
<td><code class="docutils literal"><span
class="pre">DefaultProductionExceptionHandler</span></code></td>
</tr>
- <tr class="row-even"><td>poll.ms</td>
+ <tr class="row-odd"><td>poll.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to block
waiting for input.</td>
<td><code class="docutils literal"><span
class="pre">100</span></code></td>
</tr>
- <tr class="row-odd"><td>rack.aware.assignment.strategy</td>
+ <tr class="row-even"><td>rack.aware.assignment.strategy</td>
<td>Low</td>
<td colspan="2">The strategy used for rack aware assignment.
Acceptable value are
<code class="docutils literal"><span
class="pre">"none"</span></code> (default),
@@ -455,7 +465,7 @@
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
See <a class="reference internal"
href="#rack-aware-assignment-strategy"><span class="std std-ref">Rack Aware
Assignment Strategy</span></a>.</td>
<td><code class="docutils literal"><span
class="pre">"none"</span></code></td>
</tr>
- <tr class="row-even><td>rack.aware.assignment.tags</td>
+ <tr class="row-odd"><td>rack.aware.assignment.tags</td>
<td>Low</td>
<td colspan="2">List of tag keys used to distribute standby
replicas across Kafka Streams
clients. When configured, Kafka Streams will make a best-effort
to distribute the standby tasks over
@@ -463,72 +473,72 @@
streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
See <a class="reference internal"
href="#rack-aware-assignment-tags"><span class="std std-ref">Rack Aware
Assignment Tags</span></a>.</td>
<td>the empty list</td>
</tr>
- <tr class="row-odd"><td>rack.aware.assignment.non_overlap_cost</td>
+ <tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td>
<td>Low</td>
<td colspan="2">Cost associated with moving tasks from existing
assignment.
See <a class="reference internal"
href="#rack-aware-assignment-non-overlap-cost"><span class="std std-ref">Rack
Aware Assignment Non-Overlap-Cost</span></a>.</td>
<td><code class="docutils literal"><span
class="pre">null</span></code></td>
</tr>
- <tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td>
+ <tr class="row-odd"><td>rack.aware.assignment.non_overlap_cost</td>
<td>Low</td>
<td colspan="2">Cost associated with cross rack traffic.
See <a class="reference internal"
href="#rack-aware-assignment-traffic-cost"><span class="std std-ref">Rack Aware
Assignment Traffic-Cost</span></a>.</td>
<td><code class="docutils literal"><span
class="pre">null</span></code></td>
</tr>
- <tr class="row-odd"><td>replication.factor</td>
+ <tr class="row-even"><td>replication.factor</td>
<td>Medium</td>
<td colspan="2">The replication factor for changelog topics and
repartition topics created by the application.
The default of <code>-1</code> (meaning: use broker default
replication factor) requires broker version 2.4 or newer.</td>
<td><code class="docutils literal"><span
class="pre">-1</span></code></td>
</tr>
- <tr class="row-even"><td>retry.backoff.ms</td>
+ <tr class="row-odd"><td>retry.backoff.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds, before a
request is retried.</td>
<td><code class="docutils literal"><span
class="pre">100</span></code></td>
</tr>
- <tr class="row-odd"><td>rocksdb.config.setter</td>
+ <tr class="row-even"><td>rocksdb.config.setter</td>
<td>Medium</td>
<td colspan="2">The RocksDB configuration.</td>
<td><code class="docutils literal"><span
class="pre">null</span></code></td>
</tr>
- <tr class="row-even"><td>state.cleanup.delay.ms</td>
+ <tr class="row-odd"><td>state.cleanup.delay.ms</td>
<td>Low</td>
<td colspan="2">The amount of time in milliseconds to wait before
deleting state when a partition has migrated.</td>
<td><code class="docutils literal"><span
class="pre">600000</span></code></td> (10 minutes)</td>
</tr>
- <tr class="row-odd"><td>state.dir</td>
+ <tr class="row-even"><td>state.dir</td>
<td>High</td>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span
class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr>
- <tr class="row-even"><td>task.assignor.class</td>
+ <tr class="row-odd"><td>task.assignor.class</td>
<td>Medium</td>
<td colspan="2">A task assignor class or class name implementing
the <code>TaskAssignor</code> interface.</td>
<td>The high-availability task assignor.</td>
</tr>
- <tr class="row-odd"><td>task.timeout.ms</td>
+ <tr class="row-even"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task
might stall due to internal errors and retries until an error is raised. For a
timeout of <code>0 ms</code>, a task would raise an error for the first
internal error. For any timeout larger than <code>0 ms</code>, a task will
retry at least once before an error is raised.</td>
<td><code class="docutils literal"><span
class="pre">300000</span></code></td> (5 minutes)</td>
</tr>
- <tr class="row-even"><td>topology.optimization</td>
+ <tr class="row-odd"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should
optimize the topology and what optimizations to apply. Acceptable values are:
<code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>),
<code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated
list of specific optimizations:
<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code>
(<code>reuse.ktable.source.topics</code>),
<code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.r [...]
<code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code>
(<code>single.store.self.join</code>). </td>
<td><code class="docutils literal"><span
class="pre">"NO_OPTIMIZATION"</span></code></td>
</tr>
- <tr class="row-odd"><td>upgrade.from</td>
+ <tr class="row-even"><td>upgrade.from</td>
<td>Medium</td>
<td colspan="2">The version you are upgrading from during a
rolling upgrade.
See <a class="reference internal"
href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade
From</span></a></td>
<td><code class="docutils literal"><span
class="pre">null</span></code></td>
</tr>
- <tr
class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
+ <tr
class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is
not deleted from the log prematurely. Allows for clock drift.</td>
<td><code class="docutils literal"><span
class="pre">86400000</span></code></td> (1 day)</td>
</tr>
- <tr class="row-odd"><td>window.size.ms</td>
+ <tr class="row-even"><td>window.size.ms</td>
<td>Low</td>
<td colspan="2">Sets window size for the deserializer in order to
calculate window end times.</td>
<td><code class="docutils literal"><span
class="pre">null</span></code></td>
@@ -753,6 +763,18 @@
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
<p>This is discussed in more detail in <a class="reference
internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std
std-ref">Data types and serialization</span></a>.</p>
</div></blockquote>
</div>
+ <div class="section" id="ensure-explicit-internal-resource-naming">
+ <h4><a class="toc-backref"
href="#id46">ensure.explicit.internal.resource.naming</a><a class="headerlink"
href="#ensure-explicit-internal-resource-naming" title="Permalink to this
headline"></a></h4>
+ <blockquote>
+ <div>
+ <p>
+ Whether to enforce explicit naming for all internal resources
of the topology, including internal
+ topics (e.g., changelog and repartition topics) and their
associated state stores.
+ When enabled, the application will refuse to start if any
internal resource has an auto-generated name.
+ </p>
+ </div>
+ </blockquote>
+ </div>
<div class="section" id="rack-aware-assignment-non-overlap-cost">
<h4><a class="toc-backref"
href="#id37">rack.aware.assignment.non_overlap_cost</a><a class="headerlink"
href="#rack-aware-assignment-non-overlap-cost" title="Permalink to this
headline"></a></h4>
<blockquote>
diff --git a/docs/streams/developer-guide/dsl-topology-naming.html
b/docs/streams/developer-guide/dsl-topology-naming.html
index ec3bc857c10..832806050a5 100644
--- a/docs/streams/developer-guide/dsl-topology-naming.html
+++ b/docs/streams/developer-guide/dsl-topology-naming.html
@@ -300,6 +300,19 @@ stream.filter((k, v) -> v != null && v.length() >= 6)
<td>Stream/Table non-stateful
operations</td><td>Named</td>
</tr>
</table>
+
+ To further enforce best practices, Kafka Streams provides a
configuration option,
+ <code class="docutils literal"><span
class="pre">ensure.explicit.internal.resource.naming</span></code>:
+ <pre class="line-numbers"><code class="language-java">/
+ Properties props = new Properties();
+
props.put(StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ </code></pre>
+ This parameter ensures that all internal topics, state
stores, and changelog topics have explicitly defined names. When this
configuration
+ is enabled, a Kafka Streams application will not start if any
of these components rely on auto-generated names. This guarantees
+ stability across topology updates, as manually defined names
remain unchanged even when new processors or transformations are added.
+ Enforcing explicit naming is particularly important in
production environments, where consistency and backward compatibility are
essential
+ for maintaining reliable stream processing applications.
+
</p>
</div>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 89fd62fd9d6..257187bfa45 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -139,6 +139,16 @@
More details about the new config
<code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>
+ <h3><a id="streams_api_changes_410"
href="#streams_api_changes_410">Streams API changes in 4.1.0</a></h3>
+
+ <p>
+ The introduction of <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1111:+Enforcing+Explicit+Naming+for+Kafka+Streams+Internal+Topics">KIP-1111</a>
+ enables you to enforce explicit naming for all internal resources of
the topology, including internal topics (e.g., changelog and repartition
topics) and their associated state stores.
+ This ensures that every internal resource is named before the Kafka
Streams application is deployed, which is essential for upgrading your topology.
+ You can enable this feature via <code>StreamsConfig</code> using the
<code>StreamsConfig#ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG</code>
parameter.
+ When set to <code>true</code>, the application will refuse to start if
any internal resource has an auto-generated name.
+ </p>
+
<h3><a id="streams_api_changes_400"
href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>
<p>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 7037e8d7fd3..e56a4cbfb4e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -384,7 +384,7 @@ public class StreamsBuilder {
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materializedInternal =
new MaterializedInternal<>(
- Materialized.with(consumedInternal.keySerde(),
consumedInternal.valueSerde()),
+ Materialized.<K, V, KeyValueStore<Bytes,
byte[]>>with(consumedInternal.keySerde(),
consumedInternal.valueSerde()).withLoggingDisabled(),
internalStreamsBuilder,
topic + "-",
true /* force materializing global tables */);
@@ -457,7 +457,7 @@ public class StreamsBuilder {
Objects.requireNonNull(materialized, "materialized can't be null");
final ConsumedInternal<K, V> consumedInternal = new
ConsumedInternal<>(consumed);
// always use the serdes from consumed
-
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde());
+
materialized.withKeySerde(consumedInternal.keySerde()).withValueSerde(consumedInternal.valueSerde()).withLoggingDisabled();
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>
materializedInternal =
new MaterializedInternal<>(materialized, internalStreamsBuilder,
topic + "-");
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 6f0e21025c3..fa9e1bd48f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -592,6 +592,12 @@ public class StreamsConfig extends AbstractConfig {
public static final String ENABLE_METRICS_PUSH_DOC = "Whether to enable
pushing of internal client metrics for (main, restore, and global) consumers,
producers, and admin clients." +
" The cluster must have a client metrics subscription which
corresponds to a client.";
+ /** {@code ensure.explicit.internal.resource.naming} */
+ public static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG
= "ensure.explicit.internal.resource.naming";
+ static final String ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC =
"Whether to enforce explicit naming for all internal resources of the topology,
including internal" +
+ " topics (e.g., changelog and repartition topics) and their associated
state stores." +
+ " When enabled, the application will refuse to start if any internal
resource has an auto-generated name.";
+
/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG =
"log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output
interval in milliseconds for logging summary information.\n" +
@@ -869,6 +875,11 @@ public class StreamsConfig extends AbstractConfig {
Importance.HIGH,
STATE_DIR_DOC,
"${java.io.tmpdir}")
+ .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.HIGH,
+ ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC)
// MEDIUM
diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
index e96d5281d09..da8c246b26d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
@@ -55,6 +55,8 @@ import static
org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_H
import static
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
+import static
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
+import static
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
@@ -142,7 +144,12 @@ public final class TopologyConfig extends AbstractConfig {
Type.CLASS,
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW,
- DSL_STORE_SUPPLIERS_CLASS_DOC);
+ DSL_STORE_SUPPLIERS_CLASS_DOC)
+ .define(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.HIGH,
+ ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC);
}
private static final Logger log =
LoggerFactory.getLogger(TopologyConfig.class);
@@ -164,6 +171,8 @@ public final class TopologyConfig extends AbstractConfig {
public final Supplier<DeserializationExceptionHandler>
deserializationExceptionHandlerSupplier;
public final Supplier<ProcessingExceptionHandler>
processingExceptionHandlerSupplier;
+ public final boolean ensureExplicitInternalResourceNaming;
+
public TopologyConfig(final StreamsConfig configs) {
this(null, configs, mkObjectProperties(configs.originals()));
}
@@ -272,6 +281,8 @@ public final class TopologyConfig extends AbstractConfig {
} else {
dslStoreSuppliers =
globalAppConfigs.getClass(DSL_STORE_SUPPLIERS_CLASS_CONFIG);
}
+
+ ensureExplicitInternalResourceNaming =
globalAppConfigs.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG);
}
@Deprecated
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
index 2bd81e43d27..f41f52dac8b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
@@ -59,7 +59,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final boolean isOutputVersioned) {
- processRepartitions(groupPatterns, storeFactory.storeName());
+ processRepartitions(groupPatterns, storeFactory.storeName(),
queryableName);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
@@ -94,7 +94,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
- processRepartitions(groupPatterns, storeFactory.storeName());
+ processRepartitions(groupPatterns, storeFactory.storeName(),
queryableName);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
@@ -135,7 +135,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
- processRepartitions(groupPatterns, storeFactory.storeName());
+ processRepartitions(groupPatterns, storeFactory.storeName(),
queryableName);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
int counter = 0;
@@ -175,7 +175,7 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
- processRepartitions(groupPatterns, storeFactory.storeName());
+ processRepartitions(groupPatterns, storeFactory.storeName(),
queryableName);
final Collection<KStreamAggProcessorSupplier> parentProcessors = new
ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
int counter = 0;
@@ -206,7 +206,8 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
}
private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>,
Aggregator<? super K, ? super Object, VOut>> groupPatterns,
- final String storeName) {
+ final String storeName,
+ final String queryableName) {
for (final KGroupedStreamImpl<K, ?> repartitionReqs :
groupPatterns.keySet()) {
if (repartitionReqs.repartitionRequired) {
@@ -216,8 +217,9 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
final String repartitionNamePrefix =
repartitionReqs.userProvidedRepartitionTopicName != null ?
repartitionReqs.userProvidedRepartitionTopicName :
storeName;
- createRepartitionSource(repartitionNamePrefix,
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde);
+ final boolean isRepartitionTopicNameProvidedByUser =
repartitionReqs.userProvidedRepartitionTopicName != null || queryableName !=
null;
+ createRepartitionSource(repartitionNamePrefix,
repartitionNodeBuilder, repartitionReqs.keySerde, repartitionReqs.valueSerde,
isRepartitionTopicNameProvidedByUser);
if (!parentNodes.containsKey(repartitionReqs)) {
final GraphNode repartitionNode =
repartitionNodeBuilder.build();
builder.addGraphNode(repartitionReqs.graphNode,
repartitionNode);
@@ -270,14 +272,16 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
private <VIn> void createRepartitionSource(final String
repartitionTopicNamePrefix,
final
OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
final Serde<K> keySerde,
- final Serde<?> valueSerde) {
+ final Serde<?> valueSerde,
+ final boolean
isRepartitionTopicNameProvidedByUser) {
KStreamImpl.createRepartitionedSource(builder,
keySerde,
(Serde<VIn>) valueSerde,
repartitionTopicNamePrefix,
null,
- (OptimizableRepartitionNodeBuilder<K, VIn>)
optimizableRepartitionNodeBuilder);
+ (OptimizableRepartitionNodeBuilder<K, VIn>)
optimizableRepartitionNodeBuilder,
+ isRepartitionTopicNameProvidedByUser);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
index b99034c5306..023513d0704 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -126,8 +126,10 @@ class GroupedStreamAggregateBuilder<K, V> {
if (repartitionRequired) {
final OptimizableRepartitionNodeBuilder<K, V>
repartitionNodeBuilder = optimizableRepartitionNodeBuilder();
+
final String repartitionTopicPrefix =
userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName :
storeName;
- sourceName = createRepartitionSource(repartitionTopicPrefix,
repartitionNodeBuilder);
+
+ sourceName = createRepartitionSource(repartitionTopicPrefix,
repartitionNodeBuilder, userProvidedRepartitionTopicName != null ||
queryableStoreName != null);
// First time through we need to create a repartition node.
// Any subsequent calls to GroupedStreamAggregateBuilder#build we
check if
@@ -157,14 +159,16 @@ class GroupedStreamAggregateBuilder<K, V> {
* @return the new sourceName of the repartitioned source
*/
private String createRepartitionSource(final String
repartitionTopicNamePrefix,
- final
OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder) {
+ final
OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder,
+ final boolean
isRepartitionTopicNameProvidedByUser) {
return KStreamImpl.createRepartitionedSource(builder,
keySerde,
valueSerde,
repartitionTopicNamePrefix,
null,
-
optimizableRepartitionNodeBuilder);
+
optimizableRepartitionNodeBuilder,
+
isRepartitionTopicNameProvidedByUser);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index c1a5bff2e05..968276bd501 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -328,6 +328,9 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
}
}
internalTopologyBuilder.validateCopartition();
+
+ internalTopologyBuilder.checkUnprovidedNames();
+
}
/**
@@ -588,7 +591,8 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
valueSerde,
repartitionTopicName,
null,
- repartitionNodeBuilder
+ repartitionNodeBuilder,
+ true
);
// ensures setting the repartition topic to the name of the
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8fb1327955e..1927aed03fa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -58,6 +58,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -525,7 +526,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
valueSerde,
name,
repartitionedInternal.streamPartitioner(),
-
unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties)
+
unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties),
+ repartitionedInternal.name() != null
);
final UnoptimizableRepartitionNode<K, V> unoptimizableRepartitionNode
= unoptimizableRepartitionNodeBuilder.build();
@@ -633,7 +635,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
valueSerdeOverride,
name,
null,
- repartitionNodeBuilder
+ repartitionNodeBuilder,
+ namedInternal.name() != null
);
tableParentNode = repartitionNodeBuilder.build();
@@ -895,21 +898,23 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
if (joinThis.repartitionRequired) {
final String joinThisName = joinThis.name;
final String leftJoinRepartitionTopicName =
name.suffixWithOrElseGet("-left", joinThisName);
+
joinThis = joinThis.repartitionForJoin(
leftJoinRepartitionTopicName,
streamJoinedInternal.keySerde(),
- streamJoinedInternal.valueSerde()
- );
+ streamJoinedInternal.valueSerde(),
+ name.name() != null);
}
if (joinOther.repartitionRequired) {
final String joinOtherName = joinOther.name;
final String rightJoinRepartitionTopicName =
name.suffixWithOrElseGet("-right", joinOtherName);
+
joinOther = joinOther.repartitionForJoin(
rightJoinRepartitionTopicName,
streamJoinedInternal.keySerde(),
- streamJoinedInternal.otherValueSerde()
- );
+ streamJoinedInternal.otherValueSerde(),
+ name.name() != null);
}
joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
@@ -928,7 +933,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
*/
private KStreamImpl<K, V> repartitionForJoin(final String repartitionName,
final Serde<K>
keySerdeOverride,
- final Serde<V>
valueSerdeOverride) {
+ final Serde<V>
valueSerdeOverride,
+ final boolean
isRepartitionTopicNameProvidedByUser) {
final Serde<K> repartitionKeySerde = keySerdeOverride != null ?
keySerdeOverride : keySerde;
final Serde<V> repartitionValueSerde = valueSerdeOverride != null ?
valueSerdeOverride : valueSerde;
final OptimizableRepartitionNodeBuilder<K, V>
optimizableRepartitionNodeBuilder =
@@ -942,7 +948,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
repartitionValueSerde,
repartitionName,
null,
- optimizableRepartitionNodeBuilder);
+ optimizableRepartitionNodeBuilder,
+ isRepartitionTopicNameProvidedByUser);
if (repartitionNode == null || !name.equals(repartitionName)) {
repartitionNode = optimizableRepartitionNodeBuilder.build();
@@ -965,11 +972,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
final Serde<Value> valueSerde,
final String repartitionTopicNamePrefix,
final StreamPartitioner<Key, Value> streamPartitioner,
- final BaseRepartitionNodeBuilder<Key, Value, RepartitionNode>
baseRepartitionNodeBuilder
- ) {
+ final BaseRepartitionNodeBuilder<Key, Value, RepartitionNode>
baseRepartitionNodeBuilder,
+ final boolean isRepartitionTopicNameProvidedByUser) {
+
final String repartitionTopicName =
repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ?
repartitionTopicNamePrefix :
repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
+ if (!isRepartitionTopicNameProvidedByUser) {
+
builder.internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withRepartitionTopic(repartitionTopicName).build());
+ }
// Always need to generate the names to burn index counter for
compatibility
final String genSinkName = builder.newProcessorName(SINK_NAME);
@@ -1051,7 +1062,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
final KStreamImpl<K, V> thisStreamRepartitioned =
repartitionForJoin(
name != null ? name : this.name,
joinedInternal.keySerde(),
- joinedInternal.leftValueSerde()
+ joinedInternal.leftValueSerde(),
+ name != null
);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner,
joinedInternal, false);
} else {
@@ -1091,7 +1103,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
final KStreamImpl<K, V> thisStreamRepartitioned =
repartitionForJoin(
name != null ? name : this.name,
joinedInternal.keySerde(),
- joinedInternal.leftValueSerde()
+ joinedInternal.leftValueSerde(),
+ name != null
);
return thisStreamRepartitioned.doStreamTableJoin(table, joiner,
joinedInternal, true);
} else {
@@ -1124,6 +1137,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
joinedInternal.gracePeriod(),
name)
);
+
+ if (joinedInternal.name() == null) {
+ final InternalResourcesNaming internalResourcesNaming =
InternalResourcesNaming.builder().withStateStore(bufferName).withChangelogTopic(bufferName
+ "-changelog").build();
+
internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming);
+ }
}
final ProcessorSupplier<K, V, K, VOut> processorSupplier = new
KStreamKTableJoin<>(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
index aeece23cf34..902079863fd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
@@ -30,6 +30,7 @@ import
org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import
org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.Stores;
@@ -168,6 +169,14 @@ class KStreamImplJoin {
);
}
+ if (userProvidedBaseStoreName == null) {
+ addInternalResourceName(thisWindowStore);
+ addInternalResourceName(otherWindowStore);
+ if (outerJoinWindowStore.isPresent()) {
+ addInternalResourceName(outerJoinWindowStore.get());
+ }
+ }
+
// Time-shared between joins to keep track of the maximum stream time
final TimeTrackerSupplier sharedTimeTrackerSupplier = new
TimeTrackerSupplier();
@@ -261,4 +270,12 @@ class KStreamImplJoin {
valueSerde
));
}
+
+ private void addInternalResourceName(final StoreFactory windowStore) {
+ final InternalResourcesNaming.Builder thisInternalResourcesNaming =
InternalResourcesNaming.builder().withStateStore(windowStore.storeName());
+ if (windowStore.loggingEnabled()) {
+
thisInternalResourcesNaming.withChangelogTopic(windowStore.storeName() +
"-changelog");
+ }
+
builder.internalTopologyBuilder().addImplicitInternalNames(thisInternalResourcesNaming.build());
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 7f37a27149e..e9e9035981d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -66,6 +66,7 @@ import
org.apache.kafka.streams.kstream.internals.suppress.NamedSuppressed;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import
org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
@@ -549,8 +550,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K,
V> implements KTable<
final SuppressedInternal<K> suppressedInternal =
buildSuppress(suppressed, name);
- final String storeName =
- suppressedInternal.name() != null ? suppressedInternal.name() +
"-store" : builder.newStoreName(SUPPRESS_NAME);
+ final String storeName;
+ if (suppressedInternal.name() != null) {
+ storeName = suppressedInternal.name() + "-store";
+ } else {
+ storeName = builder.newStoreName(SUPPRESS_NAME);
+ if (suppressedInternal.bufferConfig().isLoggingEnabled()) {
+
internalTopologyBuilder().addImplicitInternalNames(InternalResourcesNaming.builder().withChangelogTopic(storeName
+ "-changelog").build());
+ }
+ }
final StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V,
Change<V>>> storeBuilder;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index d6cd130ba6d..7bd727b09be 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.StoreSupplier;
@@ -53,6 +54,13 @@ public final class MaterializedInternal<K, V, S extends
StateStore> extends Mate
queryable = forceQueryable || storeName() != null;
if (storeName() == null && nameProvider != null) {
storeName = nameProvider.newStoreName(generatedStorePrefix);
+ if (nameProvider instanceof InternalStreamsBuilder) {
+ final InternalResourcesNaming.Builder internalResourcesNaming
= InternalResourcesNaming.builder().withStateStore(storeName);
+ if (loggingEnabled()) {
+ internalResourcesNaming.withChangelogTopic(storeName +
"-changelog");
+ }
+ ((InternalStreamsBuilder)
nameProvider).internalTopologyBuilder().addImplicitInternalNames(internalResourcesNaming.build());
+ }
}
// if store type is not configured during creating Materialized, then
try to get the topologyConfigs from nameProvider
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java
new file mode 100644
index 00000000000..f4b46088f3a
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalResourcesNaming.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+public final class InternalResourcesNaming {
+
+ private final String repartitionTopic;
+ private final String changelogTopic;
+ private final String stateStore;
+
+ private InternalResourcesNaming(final Builder builder) {
+ this.repartitionTopic = builder.repartitionTopic;
+ this.changelogTopic = builder.changelogTopic;
+ this.stateStore = builder.stateStore;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private String repartitionTopic;
+ private String changelogTopic;
+ private String stateStore;
+
+ private Builder() {}
+
+ public Builder withRepartitionTopic(final String repartitionTopic) {
+ this.repartitionTopic = repartitionTopic;
+ return this;
+ }
+
+ public Builder withChangelogTopic(final String changelogTopic) {
+ this.changelogTopic = changelogTopic;
+ return this;
+ }
+
+ public Builder withStateStore(final String stateStore) {
+ this.stateStore = stateStore;
+ return this;
+ }
+
+ public InternalResourcesNaming build() {
+ return new InternalResourcesNaming(this);
+ }
+ }
+
+ public String repartitionTopic() {
+ return repartitionTopic;
+ }
+
+ public String changelogTopic() {
+ return changelogTopic;
+ }
+
+ public String stateStore() {
+ return stateStore;
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 23f4f343fad..2503a358419 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.TopologyException;
@@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -64,12 +66,14 @@ import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import static
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
public class InternalTopologyBuilder {
public InternalTopologyBuilder() {
this.topologyName = null;
+ this.ensureExplicitInternalResourceNaming = false;
this.processorWrapper = new NoOpProcessorWrapper();
}
@@ -78,7 +82,7 @@ public class InternalTopologyBuilder {
this.topologyConfigs = topologyConfigs;
this.topologyName = topologyConfigs.topologyName;
-
+ this.ensureExplicitInternalResourceNaming =
topologyConfigs.ensureExplicitInternalResourceNaming;
try {
processorWrapper = topologyConfigs.getConfiguredInstance(
PROCESSOR_WRAPPER_CLASS_CONFIG,
@@ -194,6 +198,10 @@ public class InternalTopologyBuilder {
private boolean hasPersistentStores = false;
+ private final boolean ensureExplicitInternalResourceNaming;
+
+ private final Set<InternalResourcesNaming> implicitInternalNames = new
LinkedHashSet<>();
+
public static class ReprocessFactory<KIn, VIn, KOut, VOut> {
private final ProcessorSupplier<KIn, VIn, KOut, VOut>
processorSupplier;
@@ -2293,4 +2301,46 @@ public class InternalTopologyBuilder {
processorWrapper.wrapProcessorSupplier(name, processorSupplier)
);
}
+
+ public void addImplicitInternalNames(final InternalResourcesNaming
internalResourcesNaming) {
+ implicitInternalNames.add(internalResourcesNaming);
+ }
+
+ public void checkUnprovidedNames() {
+ if (!implicitInternalNames.isEmpty()) {
+ final StringBuilder result = new StringBuilder();
+ final List<String> changelogTopics = new ArrayList<>();
+ final List<String> stateStores = new ArrayList<>();
+ final List<String> repartitionTopics = new ArrayList<>();
+ for (final InternalResourcesNaming internalResourcesNaming :
implicitInternalNames) {
+ if (!Utils.isBlank(internalResourcesNaming.changelogTopic())) {
+
changelogTopics.add(internalResourcesNaming.changelogTopic());
+ }
+ if (!Utils.isBlank(internalResourcesNaming.stateStore())) {
+ stateStores.add(internalResourcesNaming.stateStore());
+ }
+ if
(!Utils.isBlank(internalResourcesNaming.repartitionTopic())) {
+
repartitionTopics.add(internalResourcesNaming.repartitionTopic());
+ }
+ }
+ if (!changelogTopics.isEmpty()) {
+ result.append(String.format("Following changelog topic(s) has
not been named: %s%n", String.join(", ", changelogTopics)));
+ }
+ if (!stateStores.isEmpty()) {
+ result.append(String.format("Following state store(s) has not
been named: %s%n", String.join(", ", stateStores)));
+ }
+ if (!repartitionTopics.isEmpty()) {
+ result.append(String.format("Following repartition topic(s)
has not been named: %s%n", String.join(", ", repartitionTopics)));
+ }
+ if (ensureExplicitInternalResourceNaming) {
+ throw new TopologyException(result.toString());
+ } else {
+ log.warn("Explicit naming for internal resources is currently
disabled. If you want to enforce" +
+ " user-defined names for all internal resources, set " +
ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG +
+ " to true. Note: Changing internal resource names may
require a full streams application reset for an" +
+ " already deployed application. Consult the documentation
on naming operators for more details. {}", result);
+ }
+ }
+ }
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 08e413703c1..a6c79806319 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -22,9 +22,11 @@ import
org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
@@ -35,12 +37,14 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TableJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
@@ -54,6 +58,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.InMemorySessionStore;
@@ -90,6 +95,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import static java.util.Arrays.asList;
+import static
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
@@ -103,6 +109,7 @@ import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -2354,6 +2361,552 @@ public class StreamsBuilderTest {
assertThrows(TopologyException.class, builder::build);
}
+ @Test
+ void
shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingEnabled() {
+ final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+ Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+ Materialized.with(Serdes.String(), Serdes.Long())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void
shouldThrowWhenGroupByAggregationWithRepartitionNameAndLoggingDisabled() {
+ final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+ Grouped.with("repartition-name", Serdes.String(),
Serdes.String()),
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>with(Serdes.String(), Serdes.Long())
+ .withLoggingDisabled()
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void shouldNotThrowWhenGroupByAggregationWithMaterializedName() {
+ final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+ Grouped.with(Serdes.String(), Serdes.String()),
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void
shouldNotThrowWhenGroupByAggregationWithRepartitionNameAndMaterialized() {
+ final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+ Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void
shouldThrowWhenGroupByAggregationWithoutRepartitionNameAndMaterializedName() {
+ final StreamsBuilder builder = buildWithGroupByAggregationTopology(
+ Grouped.with(Serdes.String(), Serdes.String()),
+ Materialized.with(Serdes.String(), Serdes.Long())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
+ }
+
+ private StreamsBuilder buildWithGroupByAggregationTopology(final
Grouped<String, String> grouped,
+ final
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+ final KStream<String, String> stream = builder.stream("input1");
+ stream
+ .groupBy((k, v) -> v, grouped)
+ .count(materialized)
+ .toStream()
+ .to("output", Produced.as("sink"));
+ return builder;
+ }
+
+ @Test
+ void
shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingEnabled() {
+ final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+ Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+ Materialized.with(Serdes.String(), Serdes.Long())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void
shouldThrowWhenGroupByKeyAggregationWithRepartitionNameAndLoggingDisabled() {
+ final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+ Grouped.with("repartition-name", Serdes.String(),
Serdes.String()),
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>with(Serdes.String(), Serdes.Long())
+ .withLoggingDisabled()
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void shouldNotThrowWhenGroupByKeyAggregationWithMaterializedName() {
+ final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+ Grouped.with(Serdes.String(), Serdes.String()),
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void
shouldNotThrowWhenGroupByKeyAggregationWithRepartitionNameAndMaterializedName()
{
+ final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+ Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+ Materialized.<String, Long, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void
shouldThrowWhenGroupByKeyAggregationWithoutRepartitionNameAndMaterializedName()
{
+ final StreamsBuilder builder = buildWithGroupByKeyAggregationTopology(
+ Grouped.with(Serdes.String(), Serdes.String()),
+ Materialized.with(Serdes.String(), Serdes.Long())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
+ }
+
+ private StreamsBuilder buildWithGroupByKeyAggregationTopology(final
Grouped<String, String> grouped,
+ final
Materialized<String, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+ final KStream<String, String> stream = builder.stream("input1");
+ stream
+ .selectKey((k, v) -> v)
+ .groupByKey(grouped)
+ .count(materialized)
+ .toStream()
+ .to("output", Produced.as("sink"));
+ return builder;
+ }
+
+ @Test
+ void shouldNotThrowWhenSuppressWithSuppressName() {
+ final StreamsBuilder builder =
buildAggregationWithSuppressTopology(true, true);
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void shouldThrowWhenSuppressWithoutSuppressName() {
+ final StreamsBuilder builder =
buildAggregationWithSuppressTopology(false, true);
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KTABLE-SUPPRESS-STATE-STORE-0000000003-changelog"));
+ assertFalse(e.getMessage().contains("Following state store(s) has not
been named"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void shouldThrowWhenSuppressWithoutSuppressNameAndLoggingDisabled() {
+ final StreamsBuilder builder =
buildAggregationWithSuppressTopology(false, false);
+ assertBuildDoesNotThrow(builder);
+ }
+
+ private StreamsBuilder buildAggregationWithSuppressTopology(final boolean
isSuppressNamed,
+ final boolean
isLoggingEnabled) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream = builder.stream("input1");
+ final KTable<Windowed<String>, Long> table = stream
+ .groupByKey()
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
+ .count(Materialized.as("materialized-name"));
+ if (isSuppressNamed) {
+
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())
+ .withName("suppressed-name"))
+ .toStream()
+ .to("output", Produced.as("sink"));
+ } else {
+ if (isLoggingEnabled) {
+
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
+ .toStream()
+ .to("output", Produced.as("sink"));
+ } else {
+
table.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().withLoggingDisabled()))
+ .toStream()
+ .to("output", Produced.as("sink"));
+ }
+ }
+ return builder;
+ }
+
+ @Test
+ void
shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingEnabled() {
+ final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+ StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
+ .withName("repartition-name")
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-JOINTHIS-0000000012-store-changelog,
KSTREAM-OUTEROTHER-0000000013-store-changelog,
KSTREAM-OUTERSHARED-0000000012-store-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-JOINTHIS-0000000012-store,
KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void
shouldThrowWhenKStreamKStreamJoinWithRepartitionNameAndLoggingDisabled() {
+ final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+ StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
+ .withName("repartition-name").withLoggingDisabled()
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-JOINTHIS-0000000012-store,
KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void shouldThrowWhenKStreamKStreamJoinWithMaterializedName() {
+ final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+ StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
+ .withStoreName("store-name")
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertFalse(e.getMessage().contains("Following changelog topic(s) has
not been named"));
+ assertFalse(e.getMessage().contains("Following state store(s) has not
been named"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-KEY-SELECT-0000000002-repartition,
KSTREAM-KEY-SELECT-0000000003-repartition"));
+ }
+
+ @Test
+ void
shouldNotThrowWhenKStreamKStreamJoinWithRepartitionNameAndMaterializedName() {
+ final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+ StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
+ .withName("repartition-name")
+ .withStoreName("store-name")
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void
shouldThrowWhenKStreamKStreamJoinWithoutRepartitionNameAndMaterializedName() {
+ final StreamsBuilder builder = buildKStreamKStreamJoinTopology(
+ StreamJoined.with(Serdes.String(), Serdes.String(),
Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-JOINTHIS-0000000012-store-changelog,
KSTREAM-OUTEROTHER-0000000013-store-changelog,
KSTREAM-OUTERSHARED-0000000012-store-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: KSTREAM-JOINTHIS-0000000012-store,
KSTREAM-OUTEROTHER-0000000013-store, KSTREAM-OUTERSHARED-0000000012-store"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-KEY-SELECT-0000000002-repartition,
KSTREAM-KEY-SELECT-0000000003-repartition"));
+ }
+
+ private StreamsBuilder buildKStreamKStreamJoinTopology(final
StreamJoined<String, String, String> streamJoined) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+ final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+ final KStream<String, String> streamTwo =
builder.stream(STREAM_TOPIC_TWO);
+ streamOne
+ .selectKey((k, v) -> v)
+ .leftJoin(
+ streamTwo.selectKey((k, v) -> v),
+ (value1, value2) -> value1,
+ JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(1)),
+ streamJoined
+ );
+ return builder;
+ }
+
+ @Test
+ void
shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingEnabled() {
+ final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+ Joined.with(Serdes.String(), Serdes.String(),
Serdes.String()).withName("repartition-name"),
+ Materialized.with(Serdes.String(), Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: stream-topic-two-STATE-STORE-0000000001-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: stream-topic-two-STATE-STORE-0000000001"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void
shouldThrowWhenKStreamKTableJoinWithRepartitionNameAndLoggingDisabled() {
+ final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+ Joined.with(Serdes.String(), Serdes.String(),
Serdes.String()).withName("repartition-name"),
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>with(Serdes.String(), Serdes.String())
+ .withLoggingDisabled()
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: stream-topic-two-STATE-STORE-0000000001"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void shouldThrowWhenKStreamKTableJoinWithMaterializedName() {
+ final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+ Joined.with(Serdes.String(), Serdes.String(), Serdes.String()),
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertFalse(e.getMessage().contains("Following changelog topic(s) has
not been named"));
+ assertFalse(e.getMessage().contains("Following state store(s) has not
been named"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-KEY-SELECT-0000000003-repartition"));
+ }
+
+ @Test
+ void
shouldNotThrowWhenKStreamKTableJoinWithRepartitionNameAndMaterializedName() {
+ final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+ Joined.with(Serdes.String(), Serdes.String(),
Serdes.String()).withName("repartition-name"),
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void
shouldThrowWhenKStreamKTableJoinWithoutRepartitionNameAndMaterializedName() {
+ final StreamsBuilder builder = buildKStreamKTableJoinTopology(
+ Joined.with(Serdes.String(), Serdes.String(), Serdes.String()),
+ Materialized.with(Serdes.String(), Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: stream-topic-two-STATE-STORE-0000000001-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: stream-topic-two-STATE-STORE-0000000001"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-KEY-SELECT-0000000004-repartition"));
+ }
+
+ private StreamsBuilder buildKStreamKTableJoinTopology(final Joined<String,
String, String> joined,
+ final
Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+ final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
+ final KTable<String, String> table = builder.table(STREAM_TOPIC_TWO,
materialized);
+ stream
+ .selectKey((k, v) -> v)
+ .join(
+ table,
+ (value1, value2) -> value1,
+ joined
+ );
+ return builder;
+ }
+
+
+ @Test
+ void shouldNotThrowWhenKStreamVersionedKTableJoinWithRepartitionName() {
+ final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology(
+ Joined.with(Serdes.String(), Serdes.String(),
Serdes.String()).withName("repartition-name")
+
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void shouldThrowWhenKStreamVersionedKTableJoinWithoutRepartitionName() {
+ final StreamsBuilder builder = buildKStreamVersionedKTableJoinTopology(
+ Joined.with(Serdes.String(), Serdes.String(), Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-JOIN-0000000007-Buffer-changelog"));
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: KSTREAM-JOIN-0000000007-Buffer"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-KEY-SELECT-0000000003-repartition"));
+ }
+
+ private StreamsBuilder buildKStreamVersionedKTableJoinTopology(final
Joined<String, String, String> joined) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+ final VersionedBytesStoreSupplier versionedStoreSupplier =
+
Stores.persistentVersionedKeyValueStore("versioned-ktable-store",
+ Duration.ofDays(1));
+ final Materialized<String, String, KeyValueStore<Bytes, byte[]>>
materialized =
+ Materialized.<String, String>as(versionedStoreSupplier)
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.String());
+ final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
+ final KTable<String, String> table = builder.table(STREAM_TOPIC_TWO,
materialized);
+ stream
+ .selectKey((k, v) -> v)
+ .join(
+ table,
+ (value1, value2) -> value1,
+ joined.withGracePeriod(Duration.ofHours(1))
+ )
+ .to("test-topic");
+ return builder;
+ }
+
+ @Test
+ void shouldNotThrowWhenKStreamGlobalKTableJoinWithMaterializedName() {
+ final StreamsBuilder builder = buildKStreamGlobalKTableJoinTopology(
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void shouldThrowWhenKStreamGlobalKTableJoinWithoutStoreName() {
+ final StreamsBuilder builder =
buildKStreamGlobalKTableJoinTopology(null);
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertFalse(e.getMessage().contains("Following changelog topic(s) has
not been named"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: stream-topic-two-STATE-STORE-0000000001"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+
+ }
+
+ private StreamsBuilder buildKStreamGlobalKTableJoinTopology(final
Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+ final KStream<String, String> stream = builder.stream(STREAM_TOPIC);
+ final GlobalKTable<String, String> globalTable;
+ if (materialized != null) {
+ globalTable = builder.globalTable(STREAM_TOPIC_TWO, materialized);
+ } else {
+ globalTable = builder.globalTable(STREAM_TOPIC_TWO);
+ }
+ stream
+ .selectKey((k, v) -> v)
+ .join(
+ globalTable,
+ (k, v) -> k,
+ (value1, value2) -> value1
+ );
+ return builder;
+ }
+
+ @Test
+ void shouldNotThrowWhenRepartitionWithRepartitionName() {
+ final StreamsBuilder builder = buildRepartitionTopology(
+ Repartitioned.with(Serdes.String(), Serdes.String())
+ .withName("repartition-name")
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void shouldThrowWhenRepartitionWithoutRepartition() {
+ final StreamsBuilder builder = buildRepartitionTopology(
+ Repartitioned.with(Serdes.String(), Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertFalse(e.getMessage().contains("Following changelog topic(s) has
not been named"));
+ assertFalse(e.getMessage().contains("Following state store(s) has not
been named"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: KSTREAM-REPARTITION-0000000001-repartition"));
+ }
+
+ private StreamsBuilder buildRepartitionTopology(final
Repartitioned<String, String> repartitioned) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> stream = builder.stream("input1");
+ stream
+ .repartition(repartitioned)
+ .to("output", Produced.as("sink"));
+ return builder;
+ }
+
+ @Test
+ void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingEnabled() {
+ final StreamsBuilder builder = buildCoGroupTopology(
+ Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+ Materialized.with(Serdes.String(), Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void shouldThrowWhenCoGroupWithRepartitionNameAndLoggingDisabled() {
+ final StreamsBuilder builder = buildCoGroupTopology(
+ Grouped.with("repartition-name", Serdes.String(),
Serdes.String()),
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>with(Serdes.String(), Serdes.String())
+ .withLoggingDisabled()
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertFalse(e.getMessage().contains("Following repartition topic(s)
has not been named"));
+ }
+
+ @Test
+ void shouldNotThrowWhenCoGroupWithMaterializedName() {
+ final StreamsBuilder builder = buildCoGroupTopology(
+ Grouped.with(Serdes.String(), Serdes.String()),
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void shouldNotThrowWhenCoGroupWithRepartitionNameAndMaterializedName() {
+ final StreamsBuilder builder = buildCoGroupTopology(
+ Grouped.with("repartition-name", Serdes.String(), Serdes.String()),
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("materialized-name")
+ .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
+ );
+ assertBuildDoesNotThrow(builder);
+ }
+
+ @Test
+ void shouldThrowWhenCoGroupWithoutRepartitionNameAndMaterializedName() {
+ final StreamsBuilder builder = buildCoGroupTopology(
+ Grouped.with(Serdes.String(), Serdes.String()),
+ Materialized.with(Serdes.String(), Serdes.String())
+ );
+ final TopologyException e = assertThrows(TopologyException.class,
builder::build);
+ assertTrue(e.getMessage().contains("Following changelog topic(s) has
not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog"));
+ assertTrue(e.getMessage().contains("Following state store(s) has not
been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003"));
+ assertTrue(e.getMessage().contains("Following repartition topic(s) has
not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition"));
+ }
+
+ private StreamsBuilder buildCoGroupTopology(final Grouped<String, String>
grouped,
+ final Materialized<String,
String, KeyValueStore<Bytes, byte[]>> materialized) {
+
+ final Map<Object, Object> props = dummyStreamsConfigMap();
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ final StreamsBuilder builder = new StreamsBuilder(new
TopologyConfig(new StreamsConfig(props)));
+
+ final KStream<String, String> streamOne = builder.stream(STREAM_TOPIC);
+ final KStream<String, String> streamTwo =
builder.stream(STREAM_TOPIC_TWO);
+
+ final KGroupedStream<String, String> groupedOne =
streamOne.groupBy((k, v) -> v, grouped);
+ final KGroupedStream<String, String> groupedTwo =
streamTwo.groupByKey();
+
+ final Aggregator<String, String, String> agg1 = (key, value,
aggregate) -> aggregate + value;
+ final Aggregator<String, String, String> agg2 = (key, value,
aggregate) -> aggregate + value;
+
+ final KTable<String, String> coGroupedStream = groupedOne
+ .cogroup(agg1)
+ .cogroup(groupedTwo, agg2)
+ .aggregate(() -> "", materialized);
+
+ coGroupedStream.toStream().to("output");
+
+ return builder;
+ }
+
private static void assertBuildDoesNotThrow(final StreamsBuilder builder) {
try {
builder.build();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ce99358ad6d..7e9447851e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -69,6 +69,7 @@ import static
org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static
org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static
org.apache.kafka.streams.StreamsConfig.ENABLE_METRICS_PUSH_CONFIG;
+import static
org.apache.kafka.streams.StreamsConfig.ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static
org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static
org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
@@ -129,6 +130,7 @@ public class StreamsConfigTest {
case "DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC":
case "DSL_STORE_SUPPLIERS_CLASS_DOC":
case "PROCESSOR_WRAPPER_CLASS_DOC":
+ case "ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_DOC":
continue;
// check for leaking, but already deprecated members
@@ -1582,6 +1584,18 @@ public class StreamsConfigTest {
assertEquals(RecordCollectorTest.ProductionExceptionHandlerMock.class,
streamsConfig.productionExceptionHandler().getClass());
}
+ @Test
+ public void shouldGetDefaultEnsureExplicitInternalResourceNaming() {
+
assertFalse(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG));
+ }
+
+ @Test
+ public void shouldEnsureExplicitInternalResourceNaming() {
+ props.put(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG, true);
+ streamsConfig = new StreamsConfig(props);
+
assertTrue(streamsConfig.getBoolean(ENSURE_EXPLICIT_INTERNAL_RESOURCE_NAMING_CONFIG));
+ }
+
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean
isKey) {