This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7ddd0d7cce1 KAFKA-19703: Removed versions 2.3 and below from
UpgradeFromValues. (#20539)
7ddd0d7cce1 is described below
commit 7ddd0d7cce192cdf4ad64f4d2d6fa8f6140e0f72
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Fri Oct 3 15:00:56 2025 -0700
KAFKA-19703: Removed versions 2.3 and below from UpgradeFromValues. (#20539)
Removed versions 2.3 and below from UpgradeFromValues, including all the
usagesof them.
Reviewers: Matthias J. Sax <[email protected]>
---
docs/streams/developer-guide/config-streams.html | 4 +-
docs/streams/upgrade-guide.html | 739 +--------------------
.../org/apache/kafka/streams/StreamsConfig.java | 60 --
.../kafka/streams/internals/UpgradeFromValues.java | 10 -
.../kstream/internals/ChangedSerializer.java | 10 -
.../kstream/internals/KTableRepartitionMap.java | 10 -
.../foreignkeyjoin/SubscriptionWrapperSerde.java | 10 -
.../assignment/AssignorConfiguration.java | 29 -
.../internals/StreamsPartitionAssignorTest.java | 22 -
9 files changed, 8 insertions(+), 886 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index 8c45d0c7976..eb65846857d 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -1222,7 +1222,7 @@
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
These optimizations include moving/reducing repartition topics and
reusing the source topic as the changelog for source KTables. These
optimizations will save on network traffic and storage in Kafka without
changing the semantics of your applications. Enabling them is recommended.
</p>
<p>
- Note that as of 2.3, you need to do two things to enable
optimizations. In addition to setting this config to
<code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
+ Note that you need to do two things to enable optimizations. In
addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll
need to pass in your
configuration properties when building your topology by using the
overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new
KafkaStreams(streamsBuilder.build(properties), properties)</code>.
</p>
@@ -1235,7 +1235,7 @@
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
The version you are upgrading from. It is important to set this
config when performing a rolling upgrade to certain versions, as described in
the upgrade guide.
You should set this config to the appropriate version before
bouncing your instances and upgrading them to the newer version. Once everyone
is on the
newer version, you should remove this config and do a second rolling
bounce. It is only necessary to set this config and follow the two-bounce
upgrade path
- when upgrading from below version 2.0, or when upgrading to 2.4+
from any version lower than 2.4.
+ when upgrading to 3.4+ from any version lower than 3.4.
</div>
</blockquote>
</div>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 99c221cbbb5..ecd2a3bbfe9 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -35,9 +35,8 @@
<p>
Upgrading from any older version to {{fullDotVersion}} is possible: if
upgrading from 3.4 or below, you will need to do two rolling bounces, where
during the first rolling bounce phase you set the config
<code>upgrade.from="older version"</code>
- (possible values are <code>"0.10.0" - "3.4"</code>) and during the
second you remove it. This is required to safely handle 3 changes. The first is
introduction of the new cooperative rebalancing protocol of the embedded
consumer. The second is a change in foreign-key join serialization format.
- Note that you will remain using the old eager rebalancing protocol if
you skip or delay the second rolling bounce, but you can safely switch over to
cooperative at any time once the entire group is on 2.4+ by removing the config
value and bouncing. For more details please refer to
- <a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>.
The third is a change in the serialization format for an internal repartition
topic. For more details, please refer to <a
href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>:
+ (possible values are <code>"2.4" - "3.4"</code>) and during the second
you remove it. This is required to safely handle 2 changes. The first is a
change in foreign-key join serialization format.
+ The second is a change in the serialization format for an internal
repartition topic. For more details, please refer to <a
href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>:
</p>
<ul>
<li> prepare your application instances for a rolling bounce and make
sure that config <code>upgrade.from</code> is set to the version from which it
is being upgrade.</li>
@@ -45,24 +44,12 @@
<li> prepare your newly deployed {{fullDotVersion}} application
instances for a second round of rolling bounces; make sure to remove the value
for config <code>upgrade.from</code> </li>
<li> bounce each instance of your application once more to complete
the upgrade </li>
</ul>
- <p> As an alternative, an offline upgrade is also possible. Upgrading from
any versions as old as 0.10.0.x to {{fullDotVersion}} in offline mode require
the following steps: </p>
+ <p> As an alternative, an offline upgrade is also possible. Upgrading from
any versions as old as 0.11.0.x to {{fullDotVersion}} in offline mode require
the following steps: </p>
<ul>
- <li> stop all old (e.g., 0.10.0.x) application instances </li>
+ <li> stop all old (e.g., 0.11.0.x) application instances </li>
<li> update your code and swap old code and jar file with new code and
new jar file </li>
<li> restart all new ({{fullDotVersion}}) application instances </li>
</ul>
- <p>
- Note: The cooperative rebalancing protocol has been the default since
2.4, but we have continued to support the
- eager rebalancing protocol to provide users an upgrade path. This
support will be dropped in a future release,
- so any users still on the eager protocol should prepare to finish
upgrading their applications to the cooperative protocol in version 3.1.
- This only affects users who are still on a version older than 2.4, and
users who have upgraded already but have not yet
- removed the <code>upgrade.from</code> config that they set when
upgrading from a version below 2.4.
- Users fitting into the latter case will simply need to unset this
config when upgrading beyond 3.1,
- while users in the former case will need to follow a slightly
different upgrade path if they attempt to upgrade from 2.3 or below to a
version above 3.1.
- Those applications will need to go through a bridge release, by first
upgrading to a version between 2.4 - 3.1 and setting the
<code>upgrade.from</code> config,
- then removing that config and upgrading to the final version above
3.1. See <a
href="https://issues.apache.org/jira/browse/KAFKA-8575">KAFKA-8575</a>
- for more details.
- </p>
<p>For a table that shows Streams API compatibility with Kafka broker
versions, see <a href="#streams_api_broker_compat">Broker Compatibility</a>.</p>
@@ -121,24 +108,6 @@
<p>Since 2.6.0 release, Kafka Streams depends on a RocksDB version that
requires MacOS 10.14 or higher.</p>
- <p>
- To run a Kafka Streams application version 2.2.1, 2.3.0, or higher a
broker version 0.11.0 or higher is required
- and the on-disk message format must be 0.11 or higher.
- Brokers must be on version 0.10.1 or higher to run a Kafka Streams
application version 0.10.1 to 2.2.0.
- Additionally, on-disk message format must be 0.10 or higher to run a
Kafka Streams application version 1.0 to 2.2.0.
- For Kafka Streams 0.10.0, broker version 0.10.0 or higher is required.
- </p>
-
- <p>
- In deprecated <code>KStreamBuilder</code> class, when a
<code>KTable</code> is created from a source topic via
<code>KStreamBuilder.table()</code>, its materialized state store
- will reuse the source topic as its changelog topic for restoring, and
will disable logging to avoid appending new updates to the source topic; in the
<code>StreamsBuilder</code> class introduced in 1.0, this behavior was changed
- accidentally: we still reuse the source topic as the changelog topic
for restoring, but will also create a separate changelog topic to append the
update records from source topic to. In the 2.0 release, we have fixed this
issue and now users
- can choose whether or not to reuse the source topic based on the
<code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code>: if you are upgrading
from the old <code>KStreamBuilder</code> class and hence you need to change
your code to use
- the new <code>StreamsBuilder</code>, you should set this config value
to <code>StreamsConfig#OPTIMIZE</code> to continue reusing the source topic; if
you are upgrading from 1.0 or 1.1 where you are already using
<code>StreamsBuilder</code> and hence have already
- created a separate changelog topic, you should set this config value
to <code>StreamsConfig#NO_OPTIMIZATION</code> when upgrading to
{{fullDotVersion}} in order to use that changelog topic for restoring the state
store.
- More details about the new config
<code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <a
href="https://cwiki.apache.org/confluence/x/V53LB">KIP-295</a>.
- </p>
-
<h3><a id="streams_api_changes_410"
href="#streams_api_changes_410">Streams API changes in 4.1.0</a></h3>
<h4>Early Access of the Streams Rebalance Protocol</h4>
@@ -1150,705 +1119,9 @@
Hence, this feature won't be supported in the future any longer and
you need to updated your code accordingly.
If you use a custom <code>PartitionGrouper</code> and stop to use it,
the created tasks might change.
Hence, you will need to reset your application to upgrade it.
-
-
- <h3 class="anchor-heading"><a id="streams_api_changes_230"
class="anchor-link"></a><a href="#streams_api_changes_230">Streams API changes
in 2.3.0</a></h3>
-
- <p>Version 2.3.0 adds the Suppress operator to the
<code>kafka-streams-scala</code> Ktable API.</p>
-
- <p>
- As of 2.3.0 Streams now offers an in-memory version of the window (<a
href="https://cwiki.apache.org/confluence/x/6AQlBg">KIP-428</a>)
- and the session (<a
href="https://cwiki.apache.org/confluence/x/DiqGBg">KIP-445</a>) store, in
addition to the persistent ones based on RocksDB.
- The new public interfaces <code>inMemoryWindowStore()</code> and
<code>inMemorySessionStore()</code> are added to <code>Stores</code> and
provide the built-in in-memory window or session store.
- </p>
-
- <p>
- As of 2.3.0 we've updated how to turn on optimizations. Now to enable
optimizations, you need to do two things.
- First add this line to your properties
<code>properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);</code>, as you have done before.
- Second, when constructing your <code>KafkaStreams</code> instance,
you'll need to pass your configuration properties when building your
- topology by using the overloaded
<code>StreamsBuilder.build(Properties)</code> method.
- For example <code>KafkaStreams myStream = new
KafkaStreams(streamsBuilder.build(properties), properties)</code>.
- </p>
-
- <p>
- In 2.3.0 we have added default implementation to <code>close()</code>
and <code>configure()</code> for <code>Serializer</code>,
- <code>Deserializer</code> and <code>Serde</code> so that they can be
implemented by lambda expression.
- For more details please read <a
href="https://cwiki.apache.org/confluence/x/fgw0BQ">KIP-331</a>.
- </p>
-
- <p>
- To improve operator semantics, new store types are added that allow
storing an additional timestamp per key-value pair or window.
- Some DSL operators (for example KTables) are using those new stores.
- Hence, you can now retrieve the last update timestamp via Interactive
Queries if you specify
- <code>TimestampedKeyValueStoreType</code> or
<code>TimestampedWindowStoreType</code> as your <code>QueryableStoreType</code>.
- While this change is mainly transparent, there are some corner cases
that may require code changes:
- <strong>Caution: If you receive an untyped store and use a cast, you
might need to update your code to cast to the correct type.
- Otherwise, you might get an exception similar to
- <code>java.lang.ClassCastException: class
org.apache.kafka.streams.state.ValueAndTimestamp cannot be cast to class
YOUR-VALUE-TYPE</code>
- upon getting a value from the store.</strong>
- Additionally, <code>TopologyTestDriver#getStateStore()</code> only
returns non-built-in stores and throws an exception if a built-in store is
accessed.
- For more details please read <a
href="https://cwiki.apache.org/confluence/x/0j6HB">KIP-258</a>.
- </p>
-
- <p>
- To improve type safety, a new operator
<code>KStream#flatTransformValues</code> is added.
- For more details please read <a
href="https://cwiki.apache.org/confluence/x/bUgYBQ">KIP-313</a>.
- </p>
-
- <p>
- Kafka Streams used to set the configuration parameter
<code>max.poll.interval.ms</code> to <code>Integer.MAX_VALUE</code>.
- This default value is removed and Kafka Streams uses the consumer
default value now.
- For more details please read <a
href="https://cwiki.apache.org/confluence/x/1COGBg">KIP-442</a>.
- </p>
-
- <p>
- Default configuration for repartition topic was changed:
- The segment size for index files (<code>segment.index.bytes</code>) is
no longer 50MB, but uses the cluster default.
- Similarly, the configuration <code>segment.ms</code> in no longer 10
minutes, but uses the cluster default configuration.
- Lastly, the retention period (<code>retention.ms</code>) is changed
from <code>Long.MAX_VALUE</code> to <code>-1</code> (infinite).
- For more details please read <a
href="https://cwiki.apache.org/confluence/x/4iOGBg">KIP-443</a>.
- </p>
-
- <p>
- To avoid memory leaks, <code>RocksDBConfigSetter</code> has a new
<code>close()</code> method that is called on shutdown.
- Users should implement this method to release any memory used by
RocksDB config objects, by closing those objects.
- For more details please read <a
href="https://cwiki.apache.org/confluence/x/QhaZBg">KIP-453</a>.
- </p>
-
- <p>
- RocksDB dependency was updated to version <code>5.18.3</code>.
- The new version allows to specify more RocksDB configurations,
including <code>WriteBufferManager</code> which helps to limit RocksDB off-heap
memory usage.
- For more details please read <a
href="https://issues.apache.org/jira/browse/KAFKA-8215">KAFKA-8215</a>.
- </p>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_220"
class="anchor-link"></a><a href="#streams_api_changes_220">Streams API changes
in 2.2.0</a></h3>
- <p>
- We've simplified the <code>KafkaStreams#state</code> transition
diagram during the starting up phase a bit in 2.2.0: in older versions the
state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then
to <code>REBALANCING</code> to get the first
- stream task assignment, and then back to <code>RUNNING</code>;
starting in 2.2.0 it will transit from <code>CREATED</code> directly to
<code>REBALANCING</code> and then to <code>RUNNING</code>.
- If you have registered a <code>StateListener</code> that captures
state transition events, you may need to adjust your listener implementation
accordingly for this simplification (in practice, your listener logic should be
very unlikely to be affected at all).
- </p>
-
- <p>
- In <code>WindowedSerdes</code>, we've added a new static constructor
to return a <code>TimeWindowSerde</code> with configurable window size. This is
to help users to construct time window serdes to read directly from a
time-windowed store's changelog.
- More details can be found in <a
href="https://cwiki.apache.org/confluence/x/WYTQBQ">KIP-393</a>.
- </p>
-
- <p>
- In 2.2.0 we have extended a few public interfaces including
<code>KafkaStreams</code> to extend <code>AutoCloseable</code> so that they can
be
- used in a try-with-resource statement. For a full list of public
interfaces that get impacted please read <a
href="https://cwiki.apache.org/confluence/x/-AeQBQ">KIP-376</a>.
- </p>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_210"
class="anchor-link"></a><a href="#streams_api_changes_210">Streams API changes
in 2.1.0</a></h3>
- <p>
- We updated <code>TopologyDescription</code> API to allow for better
runtime checking.
- Users are encouraged to use <code>#topicSet()</code> and
<code>#topicPattern()</code> accordingly on
<code>TopologyDescription.Source</code> nodes,
- instead of using <code>#topics()</code>, which has since been
deprecated. Similarly, use <code>#topic()</code> and
<code>#topicNameExtractor()</code>
- to get descriptions of <code>TopologyDescription.Sink</code> nodes.
For more details, see
- <a href="https://cwiki.apache.org/confluence/x/NQU0BQ">KIP-321</a>.
- </p>
-
- <p>
- We've added a new class <code>Grouped</code> and deprecated
<code>Serialized</code>. The intent of adding <code>Grouped</code> is the
ability to
- name repartition topics created when performing aggregation
operations. Users can name the potential repartition topic using the
- <code>Grouped#as()</code> method which takes a <code>String</code> and
is used as part of the repartition topic name. The resulting repartition
- topic name will still follow the pattern of
<code>${application-id}->name<-repartition</code>. The
<code>Grouped</code> class is now favored over
- <code>Serialized</code> in <code>KStream#groupByKey()</code>,
<code>KStream#groupBy()</code>, and <code>KTable#groupBy()</code>.
- Note that Kafka Streams does not automatically create repartition
topics for aggregation operations.
-
- Additionally, we've updated the <code>Joined</code> class with a new
method <code>Joined#withName</code>
- enabling users to name any repartition topics required for performing
Stream/Stream or Stream/Table join. For more details repartition
- topic naming, see <a
href="https://cwiki.apache.org/confluence/x/mgJ1BQ">KIP-372</a>.
-
- As a result we've updated the Kafka Streams Scala API and removed the
<code>Serialized</code> class in favor of adding <code>Grouped</code>.
- If you just rely on the implicit <code>Serialized</code>, you just
need to recompile; if you pass in <code>Serialized</code> explicitly, sorry
you'll have to make code changes.
- </p>
-
- <p>
- We've added a new config named <code>max.task.idle.ms</code> to allow
users specify how to handle out-of-order data within a task that may be
processing multiple
- topic-partitions (see <a
href="/{{version}}/documentation/streams/core-concepts.html#streams_out_of_ordering">Out-of-Order
Handling</a> section for more details).
- The default value is set to <code>0</code>, to favor minimized latency
over synchronization between multiple input streams from topic-partitions.
- If users would like to wait for longer time when some of the
topic-partitions do not have data available to process and hence cannot
determine its corresponding stream time,
- they can override this config to a larger value.
- </p>
-
- <p>
- We've added the missing
<code>SessionBytesStoreSupplier#retentionPeriod()</code> to be consistent with
the <code>WindowBytesStoreSupplier</code> which allows users to get the
specified retention period for session-windowed stores.
- We've also added the missing
<code>StoreBuilder#withCachingDisabled()</code> to allow users to turn off
caching for their customized stores.
- </p>
-
- <p>
- We added a new serde for UUIDs (<code>Serdes.UUIDSerde</code>) that
you can use via <code>Serdes.UUID()</code>
- (cf. <a href="https://cwiki.apache.org/confluence/x/26hjB">KIP-206</a>).
- </p>
-
- <p>
- We updated a list of methods that take <code>long</code> arguments as
either timestamp (fix point) or duration (time period)
- and replaced them with <code>Instant</code> and <code>Duration</code>
parameters for improved semantics.
- Some old methods base on <code>long</code> are deprecated and users
are encouraged to update their code.
- <br />
- In particular, aggregation windows (hopping/tumbling/unlimited time
windows and session windows) as well as join windows now take
<code>Duration</code>
- arguments to specify window size, hop, and gap parameters.
- Also, window sizes and retention times are now specified as
<code>Duration</code> type in <code>Stores</code> class.
- The <code>Window</code> class has new methods
<code>#startTime()</code> and <code>#endTime()</code> that return window
start/end timestamp as <code>Instant</code>.
- For interactive queries, there are new <code>#fetch(...)</code>
overloads taking <code>Instant</code> arguments.
- Additionally, punctuations are now registered via
<code>ProcessorContext#schedule(Duration interval, ...)</code>.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>.
- </p>
-
- <p>
- We deprecated <code>KafkaStreams#close(...)</code> and replaced it
with <code>KafkaStreams#close(Duration)</code> that accepts a single timeout
argument
- Note: the new <code>#close(Duration)</code> method has improved (but
slightly different) semantics.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>.
- </p>
-
- <p>
- The newly exposed <code>AdminClient</code> metrics are now available
when calling the <code>KafkaStream#metrics()</code> method.
- For more details on exposing <code>AdminClients</code> metrics
- see <a href="https://cwiki.apache.org/confluence/x/lQg0BQ">KIP-324</a>
- </p>
-
- <p>
- We deprecated the notion of segments in window stores as those are
intended to be an implementation details.
- Thus, method <code>Windows#segments()</code> and variable
<code>Windows#segments</code> were deprecated.
- If you implement custom windows, you should update your code
accordingly.
- Similarly, <code>WindowBytesStoreSupplier#segments()</code> was
deprecated and replaced with
<code>WindowBytesStoreSupplier#segmentInterval()</code>.
- If you implement custom window store, you need to update your code
accordingly.
- Finally, <code>Stores#persistentWindowStore(...)</code> were
deprecated and replaced with a new overload that does not allow to specify the
number of segments any longer.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/mQU0BQ">KIP-319</a>
- (note: <a
href="https://cwiki.apache.org/confluence/x/sQU0BQ">KIP-328</a> and
- <a href="https://cwiki.apache.org/confluence/x/IBNPBQ">KIP-358</a>
'overlap' with KIP-319).
- </p>
-
- <p>
- We've added an overloaded <code>StreamsBuilder#build</code> method
that accepts an instance of <code>java.util.Properties</code> with the intent
of using the
- <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> config added
in Kafka Streams 2.0. Before 2.1, when building a topology with
- the DSL, Kafka Streams writes the physical plan as the user makes
calls on the DSL. Now by providing a <code>java.util.Properties</code>
instance when
- executing a <code>StreamsBuilder#build</code> call, Kafka Streams can
optimize the physical plan of the topology, provided the
<code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code>
- config is set to <code>StreamsConfig#OPTIMIZE</code>. By setting
<code>StreamsConfig#OPTIMIZE</code> in addition to the <code>KTable</code>
optimization of
- reusing the source topic as the changelog topic, the topology may be
optimized to merge redundant repartition topics into one
- repartition topic. The original no parameter version of
<code>StreamsBuilder#build</code> is still available for those who wish to not
- optimize their topology. Note that enabling optimization of the
topology may require you to do an application reset when redeploying the
application. For more
- details, see <a
href="https://cwiki.apache.org/confluence/x/CkcYBQ">KIP-312</a>
- </p>
-
- <p>
- We are introducing static membership towards Kafka Streams user. This
feature reduces unnecessary rebalances during normal application upgrades or
rolling bounces.
- For more details on how to use it, checkout <a
href="/{{version}}/documentation/#static_membership">static membership
design</a>.
- Note, Kafka Streams uses the same
<code>ConsumerConfig#GROUP_INSTANCE_ID_CONFIG</code>, and you only need to make
sure it is uniquely defined across
- different stream instances in one application.
- </p>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_200"
class="anchor-link"></a><a href="#streams_api_changes_200">Streams API changes
in 2.0.0</a></h3>
- <p>
- In 2.0.0 we have added a few new APIs on the
<code>ReadOnlyWindowStore</code> interface (for details please read <a
href="#streams_api_changes_200">Streams API changes</a> below).
- If you have customized window store implementations that extends the
<code>ReadOnlyWindowStore</code> interface you need to make code changes.
- </p>
-
- <p>
- In addition, if you using Java 8 method references in your Kafka
Streams code you might need to update your code to resolve method ambiguities.
- Hot-swapping the jar-file only might not work for this case.
- See below a complete list of <a
href="#streams_api_changes_200">2.0.0</a>
- API and semantic changes that allow you to advance your application
and/or simplify your code base.
- </p>
-
- <p>
- We moved <code>Consumed</code> interface from
<code>org.apache.kafka.streams</code> to
<code>org.apache.kafka.streams.kstream</code>
- as it was mistakenly placed in the previous release. If your code has
already used it there is a simple one-liner change needed in your import
statement.
- </p>
-
- <p>
- We have also removed some public APIs that are deprecated prior to
1.0.x in 2.0.0.
- See below for a detailed list of removed APIs.
- </p>
- <p>
- We have removed the <code>skippedDueToDeserializationError-rate</code>
and <code>skippedDueToDeserializationError-total</code> metrics.
- Deserialization errors, and all other causes of record skipping, are
now accounted for in the pre-existing metrics
- <code>skipped-records-rate</code> and
<code>skipped-records-total</code>. When a record is skipped, the event is
- now logged at WARN level. If these warnings become burdensome, we
recommend explicitly filtering out unprocessable
- records instead of depending on record skipping semantics. For more
details, see
- <a href="https://cwiki.apache.org/confluence/x/gFOHB">KIP-274</a>.
- As of right now, the potential causes of skipped records are:
- </p>
- <ul>
- <li><code>null</code> keys in table sources</li>
- <li><code>null</code> keys in table-table inner/left/outer/right
joins</li>
- <li><code>null</code> keys or values in stream-table joins</li>
- <li><code>null</code> keys or values in stream-stream joins</li>
- <li><code>null</code> keys or values in aggregations on grouped
streams</li>
- <li><code>null</code> keys or values in reductions on grouped
streams</li>
- <li><code>null</code> keys in aggregations on windowed streams</li>
- <li><code>null</code> keys in reductions on windowed streams</li>
- <li><code>null</code> keys in aggregations on session-windowed
streams</li>
- <li>
- Errors producing results, when the configured
<code>default.production.exception.handler</code> decides to
- <code>CONTINUE</code> (the default is to <code>FAIL</code> and
throw an exception).
- </li>
- <li>
- Errors deserializing records, when the configured
<code>default.deserialization.exception.handler</code>
- decides to <code>CONTINUE</code> (the default is to
<code>FAIL</code> and throw an exception).
- This was the case previously captured in the
<code>skippedDueToDeserializationError</code> metrics.
- </li>
- <li>Fetched records having a negative timestamp.</li>
- </ul>
-
- <p>
- We've also fixed the metrics name for time and session windowed store
operations in 2.0. As a result, our current built-in stores
- will have their store types in the metric names as
<code>in-memory-state</code>, <code>in-memory-lru-state</code>,
- <code>rocksdb-state</code>, <code>rocksdb-window-state</code>, and
<code>rocksdb-session-state</code>. For example, a RocksDB time windowed store's
- put operation metrics would now be
-
<code>kafka.streams:type=stream-rocksdb-window-state-metrics,client-id=([-.\w]+),task-id=([-.\w]+),rocksdb-window-state-id=([-.\w]+)</code>.
- Users need to update their metrics collecting and reporting systems
for their time and session windowed stores accordingly.
- For more details, please read the <a
href="/{{version}}/documentation/#kafka_streams_store_monitoring">State Store
Metrics</a> section.
- </p>
-
- <p>
- We have added support for methods in <code>ReadOnlyWindowStore</code>
which allows for querying a single window's key-value pair.
- For users who have customized window store implementations on the
above interface, they'd need to update their code to implement the newly added
method as well.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/UUSHB">KIP-261</a>.
- </p>
- <p>
- We have added public <code>WindowedSerdes</code> to allow users to
read from / write to a topic storing windowed table changelogs directly.
- In addition, in <code>StreamsConfig</code> we have also added
<code>default.windowed.key.serde.inner</code> and
<code>default.windowed.value.serde.inner</code>
- to let users specify inner serdes if the default serde classes are
windowed serdes.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/_keHB">KIP-265</a>.
- </p>
- <p>
- We've added message header support in the <code>Processor API</code>
in Kafka 2.0.0. In particular, we have added a new API
<code>ProcessorContext#headers()</code>
- which returns a <code>Headers</code> object that keeps track of the
headers of the source topic's message that is being processed. Through this
object, users can manipulate
- the headers map that is being propagated throughout the processor
topology as well. For more details please feel free to read
- the <a
href="/{{version}}/documentation/streams/developer-guide/processor-api.html#accessing-processor-context">Developer
Guide</a> section.
- </p>
- <p>
- We have deprecated constructors of <code>KafkaStreams</code> that take
a <code>StreamsConfig</code> as parameter.
- Please use the other corresponding constructors that accept
<code>java.util.Properties</code> instead.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/KLRzB">KIP-245</a>.
- </p>
- <p>
- Kafka 2.0.0 allows to manipulate timestamps of output records using
the Processor API (<a
href="https://cwiki.apache.org/confluence/x/Ih6HB">KIP-251</a>).
- To enable this new feature, <code>ProcessorContext#forward(...)</code>
was modified.
- The two existing overloads <code>#forward(Object key, Object value,
String childName)</code> and <code>#forward(Object key, Object value, int
childIndex)</code> were deprecated and a new overload <code>#forward(Object
key, Object value, To to)</code> was added.
- The new class <code>To</code> allows you to send records to all or
specific downstream processors by name and to set the timestamp for the output
record.
- Forwarding based on child index is not supported in the new API any
longer.
- </p>
- <p>
- We have added support to allow routing records dynamically to Kafka
topics. More specifically, in both the lower-level
<code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we
have added variants that
- take a <code>TopicNameExtractor</code> instance instead of a specific
<code>String</code> typed topic name, such that for each received record from
the upstream processor, the library will dynamically determine which Kafka
topic to write to
- based on the record's key and value, as well as record context. Note
that all the Kafka topics that may possibly be used are still considered as
user topics and hence required to be pre-created. In addition to that, we have
modified the
- <code>StreamPartitioner</code> interface to add the topic name
parameter since the topic name now may not be known beforehand; users who have
customized implementations of this interface would need to update their code
while upgrading their application
- to use Kafka Streams 2.0.0.
- </p>
- <p>
- <a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a>
changed the retention time for repartition topics by setting its default value
to <code>Long.MAX_VALUE</code>.
- Instead of relying on data retention Kafka Streams uses the new purge
data API to delete consumed data from those topics and to keep used storage
small now.
- </p>
- <p>
- We have modified the <code>ProcessorStateManger#register(...)</code>
signature and removed the deprecated <code>loggingEnabled</code> boolean
parameter as it is specified in the <code>StoreBuilder</code>.
- Users who used this function to register their state stores into the
processor topology need to simply update their code and remove this parameter
from the caller.
- </p>
- <p>
- Kafka Streams DSL for Scala is a new Kafka Streams client library
available for developers authoring Kafka Streams applications in Scala. It
wraps core Kafka Streams DSL types to make it easier to call when
- interoperating with Scala code. For example, it includes higher order
functions as parameters for transformations avoiding the need anonymous classes
in Java 7 or experimental SAM type conversions in Scala 2.11,
- automatic conversion between Java and Scala collection types, a way
- to implicitly provide Serdes to reduce boilerplate from your
application and make it more typesafe, and more! For more information see the
- <a
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#scala-dsl">Kafka
Streams DSL for Scala documentation</a> and
- <a href="https://cwiki.apache.org/confluence/x/c06HB">KIP-270</a>.
- </p>
- <p>
- We have removed these deprecated APIs:
- </p>
- <ul>
- <li><code>KafkaStreams#toString</code> no longer returns the topology
and runtime metadata; to get topology metadata users can call
<code>Topology#describe()</code> and to get thread runtime metadata users can
call <code>KafkaStreams#localThreadsMetadata</code> (they are deprecated since
1.0.0).
- For detailed guidance on how to update your code please read <a
href="#streams_api_changes_100">here</a></li>
- <li><code>TopologyBuilder</code> and <code>KStreamBuilder</code> are
removed and replaced by <code>Topology</code> and <code>StreamsBuidler</code>
respectively (they are deprecated since 1.0.0).
- For detailed guidance on how to update your code please read <a
href="#streams_api_changes_100">here</a></li>
- <li><code>StateStoreSupplier</code> are removed and replaced with
<code>StoreBuilder</code> (they are deprecated since 1.0.0);
- and the corresponding <code>Stores#create</code> and
<code>KStream, KTable, KGroupedStream</code> overloaded functions that use it
have also been removed.
- For detailed guidance on how to update your code please read <a
href="#streams_api_changes_100">here</a></li>
- <li><code>KStream, KTable, KGroupedStream</code> overloaded functions
that requires serde and other specifications explicitly are removed and
replaced with simpler overloaded functions that use <code>Consumed, Produced,
Serialized, Materialized, Joined</code> (they are deprecated since 1.0.0).
- For detailed guidance on how to update your code please read <a
href="#streams_api_changes_100">here</a></li>
- <li><code>Processor#punctuate</code>,
<code>ValueTransformer#punctuate</code>,
<code>ValueTransformer#punctuate</code> and
<code>ProcessorContext#schedule(long)</code> are removed and replaced by
<code>ProcessorContext#schedule(long, PunctuationType, Punctuator)</code> (they
are deprecated in 1.0.0). </li>
- <li>The second <code>boolean</code> typed parameter "loggingEnabled"
in <code>ProcessorContext#register</code> has been removed; users can now use
<code>StoreBuilder#withLoggingEnabled, withLoggingDisabled</code> to specify
the behavior when they create the state store. </li>
- <li><code>KTable#writeAs, print, foreach, to, through</code> are
removed, users can call <code>KTable#tostream()#writeAs</code> instead for the
same purpose (they are deprecated since 0.11.0.0).
- For detailed list of removed APIs please read <a
href="#streams_api_changes_0110">here</a></li>
- <li><code>StreamsConfig#KEY_SERDE_CLASS_CONFIG,
VALUE_SERDE_CLASS_CONFIG, TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> are removed
and replaced with <code>StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG,
DEFAULT_VALUE_SERDE_CLASS_CONFIG,
DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG</code> respectively (they are
deprecated since 0.11.0.0). </li>
- <li><code>StreamsConfig#ZOOKEEPER_CONNECT_CONFIG</code> are removed as
we do not need ZooKeeper dependency in Streams any more (it is deprecated since
0.10.2.0). </li>
- </ul>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_110"
class="anchor-link"></a><a href="#streams_api_changes_110">Streams API changes
in 1.1.0</a></h3>
- <p>
- We have added support for methods in <code>ReadOnlyWindowStore</code>
which allows for querying <code>WindowStore</code>s without the necessity of
providing keys.
- For users who have customized window store implementations on the
above interface, they'd need to update their code to implement the newly added
method as well.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/6qdjB">KIP-205</a>.
- </p>
-
- <p>
- There is a new artifact <code>kafka-streams-test-utils</code>
providing a <code>TopologyTestDriver</code>,
<code>ConsumerRecordFactory</code>, and <code>OutputVerifier</code> class.
- You can include the new artifact as a regular dependency to your
unit tests and use the test driver to test your business logic of your Kafka
Streams application.
- For more details, see <a
href="https://cwiki.apache.org/confluence/x/EQOHB">KIP-247</a>.
- </p>
-
- <p>
- The introduction of <a
href="https://cwiki.apache.org/confluence/x/QJ5zB">KIP-220</a>
- enables you to provide configuration parameters for the embedded admin
client created by Kafka Streams, similar to the embedded producer and consumer
clients.
- You can provide the configs via <code>StreamsConfig</code> by adding
the configs with the prefix <code>admin.</code> as defined by
<code>StreamsConfig#adminClientPrefix(String)</code>
- to distinguish them from configurations of other clients that share
the same config names.
- </p>
-
- <p>
- New method in <code>KTable</code>
- </p>
- <ul>
- <li> <code>transformValues</code> methods have been added to
<code>KTable</code>. Similar to those on <code>KStream</code>, these methods
allow for richer, stateful, value transformation similar to the Processor
API.</li>
- </ul>
-
- <p>
- New method in <code>GlobalKTable</code>
- </p>
- <ul>
- <li> A method has been provided such that it will return the store name
associated with the <code>GlobalKTable</code> or <code>null</code> if the store
name is non-queryable. </li>
- </ul>
-
- <p>
- New methods in <code>KafkaStreams</code>:
- </p>
- <ul>
- <li> added overload for the constructor that allows overriding the
<code>Time</code> object used for tracking system wall-clock time; this is
useful for unit testing your application code. </li>
- </ul>
-
- <p> New methods in <code>KafkaClientSupplier</code>: </p>
- <ul>
- <li> added <code>getAdminClient(config)</code> that allows to override
an <code>AdminClient</code> used for administrative requests such as internal
topic creations, etc. </li>
- </ul>
-
- <p>New error handling for exceptions during production:</p>
- <ul>
- <li>added interface <code>ProductionExceptionHandler</code> that
allows implementors to decide whether or not Streams should <code>FAIL</code>
or <code>CONTINUE</code> when certain exception occur while trying to
produce.</li>
- <li>provided an implementation,
<code>DefaultProductionExceptionHandler</code> that always fails, preserving
the existing behavior by default.</li>
- <li>changing which implementation is used can be done by settings
<code>default.production.exception.handler</code> to the fully qualified name
of a class implementing this interface.</li>
- </ul>
-
- <p> Changes in <code>StreamsResetter</code>: </p>
- <ul>
- <li> added options to specify input topics offsets to reset according
to <a href="https://cwiki.apache.org/confluence/x/ApI7B">KIP-171</a></li>
- </ul>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_100"
class="anchor-link"></a><a href="#streams_api_changes_100">Streams API changes
in 1.0.0</a></h3>
-
- <p>
- With 1.0 a major API refactoring was accomplished and the new API is
cleaner and easier to use.
- This change includes the five main classes <code>KafkaStreams</code>,
<code>KStreamBuilder</code>,
- <code>KStream</code>, <code>KTable</code>, and
<code>TopologyBuilder</code> (and some more others).
- All changes are fully backward compatible as old API is only
deprecated but not removed.
- We recommend to move to the new API as soon as you can.
- We will summarize all API changes in the next paragraphs.
- </p>
-
- <p>
- The two main classes to specify a topology via the DSL
(<code>KStreamBuilder</code>)
- or the Processor API (<code>TopologyBuilder</code>) were deprecated
and replaced by
- <code>StreamsBuilder</code> and <code>Topology</code> (both new
classes are located in
- package <code>org.apache.kafka.streams</code>).
- Note, that <code>StreamsBuilder</code> does not extend
<code>Topology</code>, i.e.,
- the class hierarchy is different now.
- The new classes have basically the same methods as the old ones to
build a topology via DSL or Processor API.
- However, some internal methods that were public in
<code>KStreamBuilder</code>
- and <code>TopologyBuilder</code> but not part of the actual API are
not present
- in the new classes any longer.
- Furthermore, some overloads were simplified compared to the original
classes.
- See <a href="https://cwiki.apache.org/confluence/x/uR8IB">KIP-120</a>
- and <a href="https://cwiki.apache.org/confluence/x/TYZjB">KIP-182</a>
- for full details.
- </p>
-
- <p>
- Changing how a topology is specified also affects
<code>KafkaStreams</code> constructors,
- that now only accept a <code>Topology</code>.
- Using the DSL builder class <code>StreamsBuilder</code> one can get
the constructed
- <code>Topology</code> via <code>StreamsBuilder#build()</code>.
- Additionally, a new class
<code>org.apache.kafka.streams.TopologyDescription</code>
- (and some more dependent classes) were added.
- Those can be used to get a detailed description of the specified
topology
- and can be obtained by calling <code>Topology#describe()</code>.
- An example using this new API is shown in the <a
href="/{{version}}/documentation/streams/quickstart">quickstart section</a>.
- </p>
-
- <p>
- New methods in <code>KStream</code>:
- </p>
- <ul>
- <li>With the introduction of <a
href="https://cwiki.apache.org/confluence/x/66JjB">KIP-202</a>
- a new method <code>merge()</code> has been created in
<code>KStream</code> as the StreamsBuilder class's
<code>StreamsBuilder#merge()</code> has been removed.
- The method signature was also changed, too: instead of providing
multiple <code>KStream</code>s into the method at the once, only a single
<code>KStream</code> is accepted.
- </li>
- </ul>
-
- <p>
- New methods in <code>KafkaStreams</code>:
- </p>
- <ul>
- <li>retrieve the current runtime information about the local threads
via <code>localThreadsMetadata()</code> </li>
- <li>observe the restoration of all state stores via
<code>setGlobalStateRestoreListener()</code>, in which users can provide their
customized implementation of the
<code>org.apache.kafka.streams.processor.StateRestoreListener</code>
interface</li>
- </ul>
-
- <p>
- Deprecated / modified methods in <code>KafkaStreams</code>:
- </p>
- <ul>
- <li>
- <code>toString()</code>, <code>toString(final String
indent)</code> were previously used to return static and runtime information.
- They have been deprecated in favor of using the new
classes/methods <code>localThreadsMetadata()</code> /
<code>ThreadMetadata</code> (returning runtime information) and
- <code>TopologyDescription</code> /
<code>Topology#describe()</code> (returning static information).
- </li>
- <li>
- With the introduction of <a
href="https://cwiki.apache.org/confluence/x/TYZjB">KIP-182</a>
- you should no longer pass in <code>Serde</code> to
<code>KStream#print</code> operations.
- If you can't rely on using <code>toString</code> to print your
keys an values, you should instead you provide a custom
<code>KeyValueMapper</code> via the <code>Printed#withKeyValueMapper</code>
call.
- </li>
- <li>
- <code>setStateListener()</code> now can only be set before the
application start running, i.e. before <code>KafkaStreams.start()</code> is
called.
- </li>
- </ul>
-
- <p>
- Deprecated methods in <code>KGroupedStream</code>
- </p>
- <ul>
- <li>
- Windowed aggregations have been deprecated from
<code>KGroupedStream</code> and moved to <code>WindowedKStream</code>.
- You can now perform a windowed aggregation by, for example, using
<code>KGroupedStream#windowedBy(Windows)#reduce(Reducer)</code>.
- </li>
- </ul>
-
- <p>
- Modified methods in <code>Processor</code>:
- </p>
- <ul>
- <li>
- <p>
- The Processor API was extended to allow users to schedule
<code>punctuate</code> functions either based on data-driven <b>stream time</b>
or wall-clock time.
- As a result, the original
<code>ProcessorContext#schedule</code> is deprecated with a new overloaded
function that accepts a user customizable <code>Punctuator</code> callback
interface, which triggers its <code>punctuate</code> API method periodically
based on the <code>PunctuationType</code>.
- The <code>PunctuationType</code> determines what notion of
time is used for the punctuation scheduling: either <a
href="/{{version}}/documentation/streams/core-concepts#streams_time">stream
time</a> or wall-clock time (by default, <b>stream time</b> is configured to
represent event time via <code>TimestampExtractor</code>).
- In addition, the <code>punctuate</code> function inside
<code>Processor</code> is also deprecated.
- </p>
- <p>
- Before this, users could only schedule based on stream time
(i.e. <code>PunctuationType.STREAM_TIME</code>) and hence the
<code>punctuate</code> function was data-driven only because stream time is
determined (and advanced forward) by the timestamps derived from the input data.
- If there is no data arriving at the processor, the stream time
would not advance and hence punctuation will not be triggered.
- On the other hand, When wall-clock time (i.e.
<code>PunctuationType.WALL_CLOCK_TIME</code>) is used, <code>punctuate</code>
will be triggered purely based on wall-clock time.
- So for example if the <code>Punctuator</code> function is
scheduled based on <code>PunctuationType.WALL_CLOCK_TIME</code>, if these 60
records were processed within 20 seconds,
- <code>punctuate</code> would be called 2 times (one time every
10 seconds);
- if these 60 records were processed within 5 seconds, then no
<code>punctuate</code> would be called at all.
- Users can schedule multiple <code>Punctuator</code> callbacks
with different <code>PunctuationType</code>s within the same processor by
simply calling <code>ProcessorContext#schedule</code> multiple times inside
processor's <code>init()</code> method.
- </p>
- </li>
- </ul>
-
- <p>
- If you are monitoring on task level or processor-node / state store
level Streams metrics, please note that the metrics sensor name and hierarchy
was changed:
- The task ids, store names and processor names are no longer in the
sensor metrics names, but instead are added as tags of the sensors to achieve
consistent metrics hierarchy.
- As a result you may need to make corresponding code changes on your
metrics reporting and monitoring tools when upgrading to 1.0.0.
- Detailed metrics sensor can be found in the <a
href="/{{version}}/documentation/#kafka_streams_monitoring">Streams
Monitoring</a> section.
- </p>
-
- <p>
- The introduction of <a
href="https://cwiki.apache.org/confluence/x/WQgwB">KIP-161</a>
- enables you to provide a default exception handler for deserialization
errors when reading data from Kafka rather than throwing the exception all the
way out of your streams application.
- You can provide the configs via the <code>StreamsConfig</code> as
<code>StreamsConfig#DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG</code>.
- The specified handler must implement the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.
- </p>
-
- <p>
- The introduction of <a
href="https://cwiki.apache.org/confluence/x/aZM7B">KIP-173</a>
- enables you to provide topic configuration parameters for any topics
created by Kafka Streams.
- This includes repartition and changelog topics.
- You can provide the configs via the <code>StreamsConfig</code> by
adding the configs with the prefix as defined by
<code>StreamsConfig#topicPrefix(String)</code>.
- Any properties in the <code>StreamsConfig</code> with the prefix will
be applied when creating internal topics.
- Any configs that aren't topic configs will be ignored.
- If you already use <code>StateStoreSupplier</code> or
<code>Materialized</code> to provide configs for changelogs, then they will
take precedence over those supplied in the config.
- </p>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_0110"
class="anchor-link"></a><a href="#streams_api_changes_0110">Streams API changes
in 0.11.0.0</a></h3>
-
- <p> Updates in <code>StreamsConfig</code>: </p>
- <ul>
- <li> new configuration parameter <code>processing.guarantee</code> is
added </li>
- <li> configuration parameter <code>key.serde</code> was deprecated and
replaced by <code>default.key.serde</code> </li>
- <li> configuration parameter <code>value.serde</code> was deprecated
and replaced by <code>default.value.serde</code> </li>
- <li> configuration parameter <code>timestamp.extractor</code> was
deprecated and replaced by <code>default.timestamp.extractor</code> </li>
- <li> method <code>keySerde()</code> was deprecated and replaced by
<code>defaultKeySerde()</code> </li>
- <li> method <code>valueSerde()</code> was deprecated and replaced by
<code>defaultValueSerde()</code> </li>
- <li> new method <code>defaultTimestampExtractor()</code> was added
</li>
- </ul>
-
- <p> New methods in <code>TopologyBuilder</code>: </p>
- <ul>
- <li> added overloads for <code>addSource()</code> that allow to define
a <code>TimestampExtractor</code> per source node </li>
- <li> added overloads for <code>addGlobalStore()</code> that allow to
define a <code>TimestampExtractor</code> per source node associated with the
global store </li>
- </ul>
-
- <p> New methods in <code>KStreamBuilder</code>: </p>
- <ul>
- <li> added overloads for <code>stream()</code> that allow to define a
<code>TimestampExtractor</code> per input stream </li>
- <li> added overloads for <code>table()</code> that allow to define a
<code>TimestampExtractor</code> per input table </li>
- <li> added overloads for <code>globalKTable()</code> that allow to
define a <code>TimestampExtractor</code> per global table </li>
- </ul>
-
- <p> Deprecated methods in <code>KTable</code>: </p>
- <ul>
- <li> <code>void foreach(final ForeachAction<? super K, ? super
V> action)</code> </li>
- <li> <code>void print()</code> </li>
- <li> <code>void print(final String streamName)</code> </li>
- <li> <code>void print(final Serde<K> keySerde, final
Serde<V> valSerde)</code> </li>
- <li> <code>void print(final Serde<K> keySerde, final
Serde<V> valSerde, final String streamName)</code> </li>
- <li> <code>void writeAsText(final String filePath)</code> </li>
- <li> <code>void writeAsText(final String filePath, final String
streamName)</code> </li>
- <li> <code>void writeAsText(final String filePath, final
Serde<K> keySerde, final Serde<V> valSerde)</code> </li>
- <li> <code>void writeAsText(final String filePath, final String
streamName, final Serde<K> keySerde, final Serde<V>
valSerde)</code> </li>
- </ul>
-
- <p>
- The above methods have been deprecated in favor of using the
Interactive Queries API.
- If you want to query the current content of the state store backing
the KTable, use the following approach:
</p>
- <ul>
- <li> Make a call to <code>KafkaStreams.store(final String storeName,
final QueryableStoreType<T> queryableStoreType)</code> </li>
- <li> Then make a call to <code>ReadOnlyKeyValueStore.all()</code> to
iterate over the keys of a <code>KTable</code>. </li>
- </ul>
- <p>
- If you want to view the changelog stream of the <code>KTable</code>
then you could call <code>KTable.toStream().print(Printed.toSysOut)</code>.
- </p>
-
- <p> Metrics using exactly-once semantics: </p>
- <p>
- If <code>"exactly_once"</code> processing (EOS version 1) is enabled
via the <code>processing.guarantee</code> parameter,
- internally Streams switches from a producer-per-thread to a
producer-per-task runtime model.
- Using <code>"exactly_once_beta"</code> (EOS version 2) does use a
producer-per-thread, so <code>client.id</code> doesn't change,
- compared with <code>"at_least_once"</code> for this case).
- In order to distinguish the different producers, the producer's
<code>client.id</code> additionally encodes the task-ID for this case.
- Because the producer's <code>client.id</code> is used to report JMX
metrics, it might be required to update tools that receive those metrics.
- </p>
-
- <p> Producer's <code>client.id</code> naming schema: </p>
- <ul>
- <li> at-least-once (default):
<code>[client.Id]-StreamThread-[sequence-number]</code> </li>
- <li> exactly-once:
<code>[client.Id]-StreamThread-[sequence-number]-[taskId]</code> </li>
- <li> exactly-once-beta:
<code>[client.Id]-StreamThread-[sequence-number]</code> </li>
- </ul>
- <p> <code>[client.Id]</code> is either set via Streams configuration
parameter <code>client.id</code> or defaults to
<code>[application.id]-[processId]</code> (<code>[processId]</code> is a random
UUID). </p>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_01021"
class="anchor-link"></a><a href="#streams_api_changes_01021">Notable changes in
0.10.2.1</a></h3>
-
- <p>
- Parameter updates in <code>StreamsConfig</code>:
- </p>
- <ul>
- <li> The default config values of embedded producer's
<code>retries</code> and consumer's <code>max.poll.interval.ms</code> have been
changed to improve the resiliency of a Kafka Streams application </li>
- </ul>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_0102"
class="anchor-link"></a><a href="#streams_api_changes_0102">Streams API changes
in 0.10.2.0</a></h3>
-
- <p>
- New methods in <code>KafkaStreams</code>:
- </p>
- <ul>
- <li> set a listener to react on application state change via
<code>setStateListener(StateListener listener)</code> </li>
- <li> retrieve the current application state via <code>state()</code>
</li>
- <li> retrieve the global metrics registry via <code>metrics()</code>
</li>
- <li> apply a timeout when closing an application via <code>close(long
timeout, TimeUnit timeUnit)</code> </li>
- <li> specify a custom indent when retrieving Kafka Streams information
via <code>toString(String indent)</code> </li>
- </ul>
-
- <p>
- Parameter updates in <code>StreamsConfig</code>:
- </p>
- <ul>
- <li> parameter <code>zookeeper.connect</code> was deprecated; a Kafka
Streams application does no longer interact with ZooKeeper for topic management
but uses the new broker admin protocol
- (cf. <a href="https://cwiki.apache.org/confluence/x/vBEIAw">KIP-4,
Section "Topic Admin Schema"</a>) </li>
- <li> added many new parameters for metrics, security, and client
configurations </li>
- </ul>
- <p> Changes in <code>StreamsMetrics</code> interface: </p>
- <ul>
- <li> removed methods: <code>addLatencySensor()</code> </li>
- <li> added methods: <code>addLatencyAndThroughputSensor()</code>,
<code>addThroughputSensor()</code>, <code>recordThroughput()</code>,
- <code>addSensor()</code>, <code>removeSensor()</code> </li>
- </ul>
-
- <p> New methods in <code>TopologyBuilder</code>: </p>
- <ul>
- <li> added overloads for <code>addSource()</code> that allow to define
a <code>auto.offset.reset</code> policy per source node </li>
- <li> added methods <code>addGlobalStore()</code> to add global
<code>StateStore</code>s </li>
- </ul>
-
- <p> New methods in <code>KStreamBuilder</code>: </p>
- <ul>
- <li> added overloads for <code>stream()</code> and
<code>table()</code> that allow to define a <code>auto.offset.reset</code>
policy per input stream/table </li>
- <li> added method <code>globalKTable()</code> to create a
<code>GlobalKTable</code> </li>
- </ul>
-
- <p> New joins for <code>KStream</code>: </p>
- <ul>
- <li> added overloads for <code>join()</code> to join with
<code>KTable</code> </li>
- <li> added overloads for <code>join()</code> and
<code>leftJoin()</code> to join with <code>GlobalKTable</code> </li>
- <li> note, join semantics in 0.10.2 were improved and thus you might
see different result compared to 0.10.0.x and 0.10.1.x
- (cf. <a href="https://cwiki.apache.org/confluence/x/EzPtAw">Kafka
Streams Join Semantics</a> in the Apache Kafka wiki)
- </ul>
-
- <p> Aligned <code>null</code>-key handling for <code>KTable</code> joins:
</p>
- <ul>
- <li> like all other KTable operations, <code>KTable-KTable</code>
joins do not throw an exception on <code>null</code> key records anymore, but
drop those records silently </li>
- </ul>
-
- <p> New window type <em>Session Windows</em>: </p>
- <ul>
- <li> added class <code>SessionWindows</code> to specify session
windows </li>
- <li> added overloads for <code>KGroupedStream</code> methods
<code>count()</code>, <code>reduce()</code>, and <code>aggregate()</code>
- to allow session window aggregations </li>
- </ul>
-
- <p> Changes to <code>TimestampExtractor</code>: </p>
- <ul>
- <li> method <code>extract()</code> has a second parameter now </li>
- <li> new default timestamp extractor class
<code>FailOnInvalidTimestamp</code>
- (it gives the same behavior as old (and removed) default extractor
<code>ConsumerRecordTimestampExtractor</code>) </li>
- <li> new alternative timestamp extractor classes
<code>LogAndSkipOnInvalidTimestamp</code> and
<code>UsePreviousTimeOnInvalidTimestamps</code> </li>
- </ul>
-
- <p> Relaxed type constraints of many DSL interfaces, classes, and methods
(cf. <a href="https://cwiki.apache.org/confluence/x/dQMIB">KIP-100</a>). </p>
-
- <h3 class="anchor-heading"><a id="streams_api_changes_0101"
class="anchor-link"></a><a href="#streams_api_changes_0101">Streams API changes
in 0.10.1.0</a></h3>
-
- <p> Stream grouping and aggregation split into two methods: </p>
- <ul>
- <li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey()
</li>
- <li> new: KStream#groupByKey() plus KGroupedStream #aggregate(),
#reduce(), and #count() </li>
- <li> Example: stream.countByKey() changes to
stream.groupByKey().count() </li>
- </ul>
-
- <p> Auto Repartitioning: </p>
- <ul>
- <li> a call to through() after a key-changing operator and before an
aggregation/join is no longer required </li>
- <li> Example: stream.selectKey(...).through(...).countByKey() changes
to stream.selectKey().groupByKey().count() </li>
- </ul>
-
- <p> TopologyBuilder: </p>
- <ul>
- <li> methods #sourceTopics(String applicationId) and
#topicGroups(String applicationId) got simplified to #sourceTopics() and
#topicGroups() </li>
- </ul>
-
- <p> DSL: new parameter to specify state store names: </p>
- <ul>
- <li> The new Interactive Queries feature requires to specify a store
name for all source KTables and window aggregation result KTables (previous
parameter "operator/window name" is now the storeName) </li>
- <li> KStreamBuilder#table(String topic) changes to #topic(String
topic, String storeName) </li>
- <li> KTable#through(String topic) changes to #through(String topic,
String storeName) </li>
- <li> KGroupedStream #aggregate(), #reduce(), and #count() require
additional parameter "String storeName"</li>
- <li> Example: stream.countByKey(TimeWindows.of("windowName", 1000))
changes to stream.groupByKey().count(TimeWindows.of(1000), "countStoreName")
</li>
- </ul>
-
- <p> Windowing: </p>
- <ul>
- <li> Windows are not named anymore: TimeWindows.of("name", 1000)
changes to TimeWindows.of(1000) (cf. DSL: new parameter to specify state store
names) </li>
- <li> JoinWindows has no default size anymore:
JoinWindows.of("name").within(1000) changes to JoinWindows.of(1000) </li>
- </ul>
+ <p>For Streams API changes in version older than 2.4.x, please check <a
href="/39/documentation/streams/upgrade-guide">3.9 upgrade document</a>.</p>
<h3 class="anchor-heading"><a id="streams_api_broker_compat"
class="anchor-link"></a><a href="#streams_api_broker_compat">Streams API broker
compatibility</a></h3>
@@ -1864,7 +1137,7 @@
<tbody>
<tr>
<td>Kafka Streams API (rows)</td>
- <td>2.1.x and<br>2.2.x and<br>2.3.x and<br>2.4.x and<br>2.5.x
and<br>2.6.x and<br>2.7.x and<br>2.8.x and<br>3.0.x and<br>3.1.x and<br>3.2.x
and<br>3.3.x and<br>3.4.x and<br>3.5.x and<br>3.6.x and<br>3.7.x and<br>3.8.x
and<br>3.9.x and<br>4.0.x</td>
+ <td>2.4.x and<br>2.5.x and<br>2.6.x and<br>2.7.x and<br>2.8.x
and<br>3.0.x and<br>3.1.x and<br>3.2.x and<br>3.3.x and<br>3.4.x and<br>3.5.x
and<br>3.6.x and<br>3.7.x and<br>3.8.x and<br>3.9.x and<br>4.0.x</td>
<td>4.1.x</td>
</tr>
<tr>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 2050db40443..4830fb960b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -287,66 +287,6 @@ public class StreamsConfig extends AbstractConfig {
OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS,
MERGE_REPARTITION_TOPICS,
SINGLE_STORE_SELF_JOIN);
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 0.10.0.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_0100 =
UpgradeFromValues.UPGRADE_FROM_0100.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 0.10.1.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_0101 =
UpgradeFromValues.UPGRADE_FROM_0101.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 0.10.2.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_0102 =
UpgradeFromValues.UPGRADE_FROM_0102.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 0.11.0.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_0110 =
UpgradeFromValues.UPGRADE_FROM_0110.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 1.0.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_10 =
UpgradeFromValues.UPGRADE_FROM_10.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 1.1.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_11 =
UpgradeFromValues.UPGRADE_FROM_11.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 2.0.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_20 =
UpgradeFromValues.UPGRADE_FROM_20.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 2.1.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_21 =
UpgradeFromValues.UPGRADE_FROM_21.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 2.2.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_22 =
UpgradeFromValues.UPGRADE_FROM_22.toString();
-
- /**
- * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 2.3.x}.
- */
- @SuppressWarnings("WeakerAccess")
- public static final String UPGRADE_FROM_23 =
UpgradeFromValues.UPGRADE_FROM_23.toString();
-
/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 2.4.x}.
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
index 798383980b5..312ef0622af 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
@@ -17,16 +17,6 @@
package org.apache.kafka.streams.internals;
public enum UpgradeFromValues {
- UPGRADE_FROM_0100("0.10.0"),
- UPGRADE_FROM_0101("0.10.1"),
- UPGRADE_FROM_0102("0.10.2"),
- UPGRADE_FROM_0110("0.11.0"),
- UPGRADE_FROM_10("1.0"),
- UPGRADE_FROM_11("1.1"),
- UPGRADE_FROM_20("2.0"),
- UPGRADE_FROM_21("2.1"),
- UPGRADE_FROM_22("2.2"),
- UPGRADE_FROM_23("2.3"),
UPGRADE_FROM_24("2.4"),
UPGRADE_FROM_25("2.5"),
UPGRADE_FROM_26("2.6"),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 4964c707d9b..219115c3b93 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -58,16 +58,6 @@ public class ChangedSerializer<T> implements
Serializer<Change<T>>, WrappingNull
}
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
- case UPGRADE_FROM_0100:
- case UPGRADE_FROM_0101:
- case UPGRADE_FROM_0102:
- case UPGRADE_FROM_0110:
- case UPGRADE_FROM_10:
- case UPGRADE_FROM_11:
- case UPGRADE_FROM_20:
- case UPGRADE_FROM_21:
- case UPGRADE_FROM_22:
- case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index a686692b40a..40bd37c0f60 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -100,16 +100,6 @@ public class KTableRepartitionMap<K, V, K1, V1> implements
KTableRepartitionMapS
}
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
- case UPGRADE_FROM_0100:
- case UPGRADE_FROM_0101:
- case UPGRADE_FROM_0102:
- case UPGRADE_FROM_0110:
- case UPGRADE_FROM_10:
- case UPGRADE_FROM_11:
- case UPGRADE_FROM_20:
- case UPGRADE_FROM_21:
- case UPGRADE_FROM_22:
- case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
index b03b24749e0..064d464d8be 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
@@ -77,16 +77,6 @@ public class SubscriptionWrapperSerde<KLeft> extends
WrappingNullableSerde<Subsc
}
switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
- case UPGRADE_FROM_0100:
- case UPGRADE_FROM_0101:
- case UPGRADE_FROM_0102:
- case UPGRADE_FROM_0110:
- case UPGRADE_FROM_10:
- case UPGRADE_FROM_11:
- case UPGRADE_FROM_20:
- case UPGRADE_FROM_21:
- case UPGRADE_FROM_22:
- case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index b210e638eca..dbf237df152 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
-import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
@@ -59,8 +58,6 @@ public final class AssignorConfiguration {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());
- validateUpgradeFrom();
-
{
final Object o =
configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
if (o == null) {
@@ -94,32 +91,6 @@ public final class AssignorConfiguration {
return referenceContainer;
}
- // cooperative rebalancing was introduced in 2.4 and the old protocol
(eager rebalancing) was removed
- // in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer
possible without a bridge release
- public void validateUpgradeFrom() {
- final String upgradeFrom =
streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
- if (upgradeFrom != null) {
- switch (UpgradeFromValues.fromString(upgradeFrom)) {
- case UPGRADE_FROM_0100:
- case UPGRADE_FROM_0101:
- case UPGRADE_FROM_0102:
- case UPGRADE_FROM_0110:
- case UPGRADE_FROM_10:
- case UPGRADE_FROM_11:
- case UPGRADE_FROM_20:
- case UPGRADE_FROM_21:
- case UPGRADE_FROM_22:
- case UPGRADE_FROM_23:
- final String errMsg = String.format(
- "The eager rebalancing protocol is no longer supported
in 4.0 which means live upgrades from 2.3 or below are not possible."
- + " Please see the Streams upgrade guide for the
bridge releases and recommended upgrade path. Got upgrade.from='%s'",
upgradeFrom);
- log.error(errMsg);
- throw new ConfigException(errMsg);
-
- }
- }
- }
-
public String logPrefix() {
return logPrefix;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index fc040157633..08674857b71 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -570,28 +570,6 @@ public class StreamsPartitionAssignorTest {
assertThat(interleavedTaskIds, equalTo(assignment));
}
- @ParameterizedTest
- @MethodSource("parameter")
- public void shouldThrowOnEagerSubscription(final Map<String, Object>
parameterizedConfig) {
- setUp(parameterizedConfig, false);
- builder.addSource(null, "source1", null, null, null, "topic1");
- builder.addSource(null, "source2", null, null, null, "topic2");
- builder.addProcessor("processor", new MockApiProcessorSupplier<>(),
"source1", "source2");
-
- final Set<TaskId> prevTasks = Set.of(
- new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)
- );
- final Set<TaskId> standbyTasks = Set.of(
- new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)
- );
-
- createMockTaskManager(prevTasks, standbyTasks);
- assertThrows(
- ConfigException.class,
- () ->
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
StreamsConfig.UPGRADE_FROM_23), parameterizedConfig)
- );
- }
-
@ParameterizedTest
@MethodSource("parameter")
public void testCooperativeSubscription(final Map<String, Object>
parameterizedConfig) {