This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7200a5c012f KAFKA-19976 Deprecate streams-scala module (#21480)
7200a5c012f is described below
commit 7200a5c012f5f2f49f7fdf73bf7f8719e5591804
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Feb 24 17:10:24 2026 +0900
KAFKA-19976 Deprecate streams-scala module (#21480)
KIP-1244: https://cwiki.apache.org/confluence/x/r4LMFw
* Deprecate streams-scala module
* Add migration document
Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
docs/apis/_index.md | 5 +-
docs/getting-started/upgrade.md | 1 +
docs/streams/developer-guide/dsl-api.md | 6 ++
docs/streams/developer-guide/scala-migration.md | 104 +++++++++++++++++++++
docs/streams/upgrade-guide.md | 6 ++
.../kafka/streams/scala/StreamsBuilder.scala | 1 +
.../kafka/streams/scala/kstream/Branched.scala | 1 +
.../streams/scala/kstream/BranchedKStream.scala | 1 +
.../streams/scala/kstream/CogroupedKStream.scala | 1 +
.../kafka/streams/scala/kstream/Consumed.scala | 1 +
.../kafka/streams/scala/kstream/Grouped.scala | 1 +
.../kafka/streams/scala/kstream/Joined.scala | 1 +
.../streams/scala/kstream/KGroupedStream.scala | 1 +
.../streams/scala/kstream/KGroupedTable.scala | 1 +
.../kafka/streams/scala/kstream/KStream.scala | 1 +
.../kafka/streams/scala/kstream/KTable.scala | 1 +
.../kafka/streams/scala/kstream/Materialized.scala | 1 +
.../kafka/streams/scala/kstream/Produced.scala | 1 +
.../streams/scala/kstream/Repartitioned.scala | 1 +
.../kstream/SessionWindowedCogroupedKStream.scala | 1 +
.../scala/kstream/SessionWindowedKStream.scala | 1 +
.../kafka/streams/scala/kstream/StreamJoined.scala | 1 +
.../kstream/TimeWindowedCogroupedKStream.scala | 1 +
.../scala/kstream/TimeWindowedKStream.scala | 1 +
.../kafka/streams/scala/serialization/Serdes.scala | 1 +
25 files changed, 141 insertions(+), 1 deletion(-)
diff --git a/docs/apis/_index.md b/docs/apis/_index.md
index 428c95f0dd1..c73b0ca16d2 100644
--- a/docs/apis/_index.md
+++ b/docs/apis/_index.md
@@ -101,7 +101,10 @@ To use Kafka Streams, add the following Maven dependency
to your project:
When using Scala you may optionally include the `kafka-streams-scala` library.
Additional documentation on using the Kafka Streams DSL for Scala is available
[in the developer
guide](/43/documentation/streams/developer-guide/dsl-api.html#scala-dsl).
To use Kafka Streams DSL for Scala 2.13, add the following Maven dependency to
your project:
-
+
+> **⚠️ DEPRECATION NOTICE**: The `kafka-streams-scala` library is deprecated
as of Kafka 4.3
+> and will be removed in Kafka 5.0. Please migrate to using the Java Streams
API directly from Scala.
+> See the [migration
guide](/{version}/streams/developer-guide/scala-migration) for details.
<dependency>
<groupId>org.apache.kafka</groupId>
diff --git a/docs/getting-started/upgrade.md b/docs/getting-started/upgrade.md
index 06904a70bff..558b954426b 100644
--- a/docs/getting-started/upgrade.md
+++ b/docs/getting-started/upgrade.md
@@ -36,6 +36,7 @@ type: docs
* Two new configs have been introduced:
`group.coordinator.cached.buffer.max.bytes` and
`share.coordinator.cached.buffer.max.bytes`. They allow the respective
coordinators to set the maximum buffer size retained for reuse. For further
details, please refer to
[KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
* The new config have been introduced: `remote.log.metadata.topic.min.isr`
with 2 as default value. You can correct the min.insync.replicas for the
existed __remote_log_metadata topic via kafka-configs.sh if needed. For further
details, please refer to
[KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
* The new config prefix `remote.log.metadata.admin.` has been introduced. It
allows independent configuration of the admin client used by
`TopicBasedRemoteLogMetadataManager`. For further details, please refer to
[KIP-1208](https://cwiki.apache.org/confluence/x/vYqhFg).
+ * The `kafka-streams-scala` library is deprecated as of Kafka 4.3 and will
be removed in Kafka 5.0. For further details, please refer to the [migration
guide](/{version}/streams/developer-guide/scala-migration).
## Upgrading to 4.2.0
diff --git a/docs/streams/developer-guide/dsl-api.md
b/docs/streams/developer-guide/dsl-api.md
index 28529283d5f..0a981f680c9 100644
--- a/docs/streams/developer-guide/dsl-api.md
+++ b/docs/streams/developer-guide/dsl-api.md
@@ -5738,6 +5738,12 @@ Kafka Streams comes with a `test-utils` module to help
you test your application
# Kafka Streams DSL for Scala
+> **⚠️ DEPRECATION NOTICE**: The Kafka Streams DSL for Scala library
(`kafka-streams-scala`) is
+> **deprecated as of Kafka 4.3** and will be **removed in Kafka 5.0**.
+>
+> **See the [migration
guide](/{version}/streams/developer-guide/scala-migration)** for instructions
+> and code examples showing how to migrate from the Scala wrapper to the Java
API.
+
The Kafka Streams DSL Java APIs are based on the Builder design pattern, which
allows users to incrementally build the target functionality using lower level
compositional fluent APIs. These APIs can be called from Scala, but there are
several issues:
1. **Additional type annotations** \- The Java APIs use Java generics in a
way that are not fully compatible with the type inferencer of the Scala
compiler. Hence the user has to add type annotations to the Scala code, which
seems rather non-idiomatic in Scala.
diff --git a/docs/streams/developer-guide/scala-migration.md
b/docs/streams/developer-guide/scala-migration.md
new file mode 100644
index 00000000000..d32f16cb033
--- /dev/null
+++ b/docs/streams/developer-guide/scala-migration.md
@@ -0,0 +1,104 @@
+---
+title: Migrating from Streams Scala to Java API
+description:
+weight: 16
+tags: ['kafka', 'docs']
+aliases:
+keywords:
+type: docs
+---
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+> **⚠️ DEPRECATION NOTICE**: The `kafka-streams-scala` library is deprecated
as of Kafka 4.3
+> and will be removed in Kafka 5.0. This guide will help you migrate your
Scala applications
+> to use the Java Streams API directly.
+> For more information, see
[KIP-1244](https://cwiki.apache.org/confluence/x/r4LMFw).
+
+## Migration Overview
+
+The Java Streams API works well from Scala with minimal adjustments. The main
differences are:
+
+1. **Use Java types directly** instead of Scala wrapper classes
+2. **Configure Serdes explicitly** via `StreamsConfig` or pass them to methods
+
+### Example: Word Count Application
+
+#### Scala Wrapper Approach (Deprecated)
+
+```scala
+import java.util.Properties
+
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import org.apache.kafka.streams.scala._
+import org.apache.kafka.streams.scala.kstream._
+import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
+import org.apache.kafka.streams.scala.serialization.Serdes._
+
+object WordCountScala extends App {
+ val props = new Properties()
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount")
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
+
+ val builder = new StreamsBuilder // Scala wrapper
+ val textLines: KStream[String, String] = builder.stream[String,
String]("input-topic")
+
+ val wordCounts: KTable[String, Long] = textLines
+ .flatMapValues(line => line.toLowerCase.split("\\W+"))
+ .groupBy((_, word) => word)
+ .count()
+
+ wordCounts.toStream.to("output-topic")
+
+ val streams = new KafkaStreams(builder.build(), props)
+ streams.start()
+}
+```
+
+#### Java API Approach
+
+```scala
+import java.util.Properties
+
+import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
+import org.apache.kafka.streams.kstream.{KStream, KTable, Produced}
+import org.apache.kafka.common.serialization.Serdes
+import scala.jdk.CollectionConverters._
+
+object WordCountJava extends App {
+ val props = new Properties()
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount")
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
+ // Configure default serdes
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
classOf[Serdes.StringSerde])
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
classOf[Serdes.StringSerde])
+
+ val builder = new StreamsBuilder // Java StreamsBuilder
+ val textLines = builder.stream[String, String]("input-topic")
+
+ val wordCounts = textLines
+ .flatMapValues(_.toLowerCase.split("\\W+"))
+ .groupBy((_, word) => word)
+ .count()
+
+ wordCounts.toStream.to("output-topic", Produced.`with`(Serdes.String(),
Serdes.Long()))
+
+ val streams = new KafkaStreams(builder.build(), props)
+ streams.start()
+}
+```
diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md
index be16ebe0235..59bf4aff224 100644
--- a/docs/streams/upgrade-guide.md
+++ b/docs/streams/upgrade-guide.md
@@ -67,6 +67,12 @@ Since 2.6.0 release, Kafka Streams depends on a RocksDB
version that requires Ma
The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`,
and `poll-ratio`, along with streams state updater metrics
`active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and
`checkpoint-ratio` have been updated. Each metric now reports, over a rolling
measurement window, the ratio of time this thread spends performing the given
action (`{action}`) to the total elapsed time in that window. The effective
window duration is determined by the metrics configurat [...]
+### Deprecation of streams-scala module (KIP-1244)
+
+The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is
deprecated in 4.3.0 and will be removed in 5.0.
+
+For a detailed migration guide with code examples, see [Migrating from Streams
Scala to Java API](/{version}/streams/developer-guide/scala-migration).
+
## Streams API changes in 4.2.0
### General Availability for a core feature set of the Streams Rebalance
Protocol (KIP-1071)
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 25f5ce339b0..d298bf95406 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -30,6 +30,7 @@ import scala.jdk.CollectionConverters._
/**
* Wraps the Java class StreamsBuilder and delegates method calls to the
underlying Java object.
*/
+@deprecated("Use `org.apache.kafka.streams.StreamsBuilder` instead", "4.3.0")
class StreamsBuilder(inner: StreamsBuilderJ = new StreamsBuilderJ) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Branched.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Branched.scala
index 63bcf323afc..d030c501070 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Branched.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Branched.scala
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.streams.kstream.{Branched => BranchedJ, KStream =>
KStreamJ}
+@deprecated("Use `org.apache.kafka.streams.kstream.Branched` instead", "4.3.0")
object Branched {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
index 196198f6e1a..7d1e12300dd 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/BranchedKStream.scala
@@ -50,6 +50,7 @@ import scala.jdk.CollectionConverters._
* @tparam K Type of keys
* @tparam V Type of values
*/
+@deprecated("Use `org.apache.kafka.streams.kstream.BranchedKStream` instead",
"4.3.0")
class BranchedKStream[K, V](val inner: BranchedKStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
index f8f33e7b4e6..6a66e63dad3 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/CogroupedKStream.scala
@@ -34,6 +34,7 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.{AggregatorFrom
* @param inner The underlying Java abstraction for CogroupedKStream
* @see `org.apache.kafka.streams.kstream.CogroupedKStream`
*/
+@deprecated("Use `org.apache.kafka.streams.kstream.CogroupedKStream` instead",
"4.3.0")
class CogroupedKStream[KIn, VOut](val inner: CogroupedKStreamJ[KIn, VOut]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
index 89f461a8fea..9919588d812 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Consumed.scala
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.kstream.{Consumed =>
ConsumedJ}
import org.apache.kafka.streams.{AutoOffsetReset, Topology}
import org.apache.kafka.streams.processor.TimestampExtractor
+@deprecated("Use `org.apache.kafka.streams.kstream.Consumed` instead", "4.3.0")
object Consumed {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
index de1aa4e9833..a5e7f4a3a8c 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Grouped.scala
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Grouped => GroupedJ}
+@deprecated("Use `org.apache.kafka.streams.kstream.Grouped` instead", "4.3.0")
object Grouped {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
index 6233ad15f45..594f798f303 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Joined.scala
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.scala.kstream
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Joined => JoinedJ}
+@deprecated("Use `org.apache.kafka.streams.kstream.Joined` instead", "4.3.0")
object Joined {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
index 01e7c1c5208..740aef5ee9d 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedStream.scala
@@ -42,6 +42,7 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for KGroupedStream
* @see `org.apache.kafka.streams.kstream.KGroupedStream`
*/
+@deprecated("Use `org.apache.kafka.streams.kstream.KGroupedStream` instead",
"4.3.0")
class KGroupedStream[K, V](val inner: KGroupedStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
index 3d9e052a2f1..8e26ce4a749 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KGroupedTable.scala
@@ -32,6 +32,7 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for KGroupedTable
* @see `org.apache.kafka.streams.kstream.KGroupedTable`
*/
+@deprecated("Use `org.apache.kafka.streams.kstream.KGroupedTable` instead",
"4.3.0")
class KGroupedTable[K, V](inner: KGroupedTableJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 76918a6f742..d7665854a3e 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -43,6 +43,7 @@ import scala.jdk.CollectionConverters._
* @see `org.apache.kafka.streams.kstream.KStream`
*/
//noinspection ScalaDeprecation
+@deprecated("Use `org.apache.kafka.streams.kstream.KStream` instead", "4.3.0")
class KStream[K, V](val inner: KStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 6a7f42285a6..f6b5e897a89 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.state.KeyValueStore
* @param inner The underlying Java abstraction for KTable
* @see `org.apache.kafka.streams.kstream.KTable`
*/
+@deprecated("Use `org.apache.kafka.streams.kstream.KTable` instead", "4.3.0")
class KTable[K, V](val inner: KTableJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
index 55c09896aca..2741c0f8451 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Materialized.scala
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.processor.StateStore
import org.apache.kafka.streams.scala.{ByteArrayKeyValueStore,
ByteArraySessionStore, ByteArrayWindowStore}
import org.apache.kafka.streams.state.{KeyValueBytesStoreSupplier,
SessionBytesStoreSupplier, WindowBytesStoreSupplier}
+@deprecated("Use `org.apache.kafka.streams.kstream.Materialized` instead",
"4.3.0")
object Materialized {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
index 1f4498b8af5..1dc1153af2b 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Produced.scala
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Produced => ProducedJ}
import org.apache.kafka.streams.processor.StreamPartitioner
+@deprecated("Use `org.apache.kafka.streams.kstream.Produced` instead", "4.3.0")
object Produced {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
index f968c859c52..3a70cd61017 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Repartitioned => RepartitionedJ}
import org.apache.kafka.streams.processor.StreamPartitioner
+@deprecated("Use `org.apache.kafka.streams.kstream.Repartitioned` instead",
"4.3.0")
object Repartitioned {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
index 1b20179d5d3..e11129139e5 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedCogroupedKStream.scala
@@ -28,6 +28,7 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.{InitializerFro
* @param inner The underlying Java abstraction for
SessionWindowedCogroupedKStream
* @see `org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream`
*/
+@deprecated("Use
`org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream` instead",
"4.3.0")
class SessionWindowedCogroupedKStream[K, V](val inner:
SessionWindowedCogroupedKStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
index 3d6e157ecdc..3457922e241 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala
@@ -36,6 +36,7 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for SessionWindowedKStream
* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream`
*/
+@deprecated("Use `org.apache.kafka.streams.kstream.SessionWindowedKStream`
instead", "4.3.0")
class SessionWindowedKStream[K, V](val inner: SessionWindowedKStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
index c8c08ba0fd8..cb15d259ef8 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/StreamJoined.scala
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{StreamJoined => StreamJoinedJ}
import org.apache.kafka.streams.state.WindowBytesStoreSupplier
+@deprecated("Use `org.apache.kafka.streams.kstream.StreamJoined` instead",
"4.3.0")
object StreamJoined {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
index ad24228ecc6..45608893660 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedCogroupedKStream.scala
@@ -28,6 +28,7 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.InitializerFrom
* @param inner The underlying Java abstraction for
TimeWindowedCogroupedKStream
* @see `org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream`
*/
+@deprecated("Use
`org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream` instead",
"4.3.0")
class TimeWindowedCogroupedKStream[K, V](val inner:
TimeWindowedCogroupedKStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
index 4fcf227e037..7dc40e649df 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/TimeWindowedKStream.scala
@@ -35,6 +35,7 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.{
* @param inner The underlying Java abstraction for TimeWindowedKStream
* @see `org.apache.kafka.streams.kstream.TimeWindowedKStream`
*/
+@deprecated("Use `org.apache.kafka.streams.kstream.TimeWindowedKStream`
instead", "4.3.0")
class TimeWindowedKStream[K, V](val inner: TimeWindowedKStreamJ[K, V]) {
/**
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
index cf643d518a2..c531e47a60e 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serdes =>
JSerdes, Serializer}
import org.apache.kafka.streams.kstream.WindowedSerdes
+@deprecated("Use org.apache.kafka.common.serialization.Serdes instead",
"4.3.0")
object Serdes extends LowPrioritySerdes {
implicit def stringSerde: Serde[String] = JSerdes.String()
implicit def longSerde: Serde[Long] =
JSerdes.Long().asInstanceOf[Serde[Long]]