This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 866f0cc3080 KAFKA-16339: [3/4 KStream#transformValues] Remove 
Deprecated "transformer" methods and classes (#17266)
866f0cc3080 is described below

commit 866f0cc30808c7150f82d7a08d1f37611ed28ef9
Author: Joao Pedro Fonseca Dantas <[email protected]>
AuthorDate: Fri Nov 22 20:07:03 2024 -0300

    KAFKA-16339: [3/4 KStream#transformValues] Remove Deprecated "transformer" 
methods and classes (#17266)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 511 ++-------------------
 .../kafka/streams/kstream/TransformerSupplier.java |   1 -
 .../kafka/streams/kstream/ValueTransformer.java    |   5 +-
 .../streams/kstream/ValueTransformerSupplier.java  |   2 -
 .../streams/kstream/ValueTransformerWithKey.java   |   5 +-
 .../kstream/ValueTransformerWithKeySupplier.java   |   2 -
 .../streams/kstream/internals/KStreamImpl.java     |  73 ---
 .../kstream/internals/KStreamTransformValues.java  |  74 ---
 .../streams/processor/ConnectedStoreProvider.java  |   6 +-
 .../apache/kafka/streams/StreamsBuilderTest.java   |  18 -
 .../streams/kstream/internals/KStreamImplTest.java | 295 +-----------
 .../internals/KStreamTransformValuesTest.java      | 154 -------
 .../kafka/streams/scala/kstream/KStream.scala      |  88 ----
 13 files changed, 53 insertions(+), 1181 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 241df1cd3ec..5c8c5ef092d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -48,8 +48,8 @@ import org.apache.kafka.streams.state.StoreBuilder;
  * A {@code KStream} can be transformed record by record, joined with another 
{@code KStream}, {@link KTable},
  * {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
  * Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f. 
{@link Topology}) via
- * {@link #process(ProcessorSupplier, String...) process(...)} and {@link 
#transformValues(ValueTransformerSupplier,
- * String...) transformValues(...)}.
+ * {@link #process(ProcessorSupplier, String...) process(...)} and {@link 
#processValues(FixedKeyProcessorSupplier,
+ * String...) processValues(...)}.
  *
  * @param <K> Type of keys
  * @param <V> Type of values
@@ -206,8 +206,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
     <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? 
extends KeyValue<? extends KR, ? extends VR>> mapper);
 
@@ -245,8 +244,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
     <KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ? 
extends KeyValue<? extends KR, ? extends VR>> mapper,
                                  final Named named);
@@ -256,7 +254,7 @@ public interface KStream<K, V> {
      * The provided {@link ValueMapper} is applied to each input record value 
and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerSupplier, String...)} for 
stateful value transformation).
+     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for 
stateful value processing).
      * <p>
      * The example below counts the number of token of the value string.
      * <pre>{@code
@@ -280,8 +278,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
     <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> 
mapper);
 
@@ -290,7 +287,7 @@ public interface KStream<K, V> {
      * The provided {@link ValueMapper} is applied to each input record value 
and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerSupplier, String...)} for 
stateful value transformation).
+     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for 
stateful value processing).
      * <p>
      * The example below counts the number of token of the value string.
      * <pre>{@code
@@ -315,8 +312,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
     <VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> 
mapper,
                                   final Named named);
@@ -326,7 +322,7 @@ public interface KStream<K, V> {
      * The provided {@link ValueMapperWithKey} is applied to each input record 
value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} 
for stateful value transformation).
+     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for 
stateful value processing).
      * <p>
      * The example below counts the number of tokens of key and value strings.
      * <pre>{@code
@@ -351,8 +347,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
     <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super 
V, ? extends VR> mapper);
 
@@ -361,7 +356,7 @@ public interface KStream<K, V> {
      * The provided {@link ValueMapperWithKey} is applied to each input record 
value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
      * This is a stateless record-by-record operation (cf.
-     * {@link #transformValues(ValueTransformerWithKeySupplier, String...)} 
for stateful value transformation).
+     * {@link #processValues(FixedKeyProcessorSupplier, String...)} for 
stateful value processing).
      * <p>
      * The example below counts the number of tokens of key and value strings.
      * <pre>{@code
@@ -387,8 +382,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      */
     <VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super 
V, ? extends VR> mapper,
                                   final Named named);
@@ -436,8 +430,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
      * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
@@ -487,8 +480,7 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
      * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
@@ -502,8 +494,8 @@ public interface KStream<K, V> {
      * stream (value type can be altered arbitrarily).
      * The provided {@link ValueMapper} is applied to each input record and 
computes zero or more output values.
      * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K:V'>, <K:V''>, ...}.
-     * This is a stateless record-by-record operation (cf. {@link 
#transformValues(ValueTransformerSupplier, String...)}
-     * for stateful value transformation).
+     * This is a stateless record-by-record operation (cf. {@link 
#processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful value processing).
      * <p>
      * The example below splits input records {@code <null:String>} containing 
sentences as values into their words.
      * <pre>{@code
@@ -530,8 +522,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
      * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
@@ -544,8 +535,8 @@ public interface KStream<K, V> {
      * stream (value type can be altered arbitrarily).
      * The provided {@link ValueMapper} is applied to each input record and 
computes zero or more output values.
      * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K:V'>, <K:V''>, ...}.
-     * This is a stateless record-by-record operation (cf. {@link 
#transformValues(ValueTransformerSupplier, String...)}
-     * for stateful value transformation).
+     * This is a stateless record-by-record operation (cf. {@link 
#processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful value processing).
      * <p>
      * The example below splits input records {@code <null:String>} containing 
sentences as values into their words.
      * <pre>{@code
@@ -573,8 +564,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
      * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
@@ -587,8 +577,8 @@ public interface KStream<K, V> {
      * stream (value type can be altered arbitrarily).
      * The provided {@link ValueMapperWithKey} is applied to each input record 
and computes zero or more output values.
      * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K:V'>, <K:V''>, ...}.
-     * This is a stateless record-by-record operation (cf. {@link 
#transformValues(ValueTransformerWithKeySupplier, String...)}
-     * for stateful value transformation).
+     * This is a stateless record-by-record operation (cf. {@link 
#processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful value processing).
      * <p>
      * The example below splits input records {@code <Integer:String>}, with 
key=1, containing sentences as values
      * into their words.
@@ -621,8 +611,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
      * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
@@ -635,8 +624,8 @@ public interface KStream<K, V> {
      * stream (value type can be altered arbitrarily).
      * The provided {@link ValueMapperWithKey} is applied to each input record 
and computes zero or more output values.
      * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K:V'>, <K:V''>, ...}.
-     * This is a stateless record-by-record operation (cf. {@link 
#transformValues(ValueTransformerWithKeySupplier, String...)}
-     * for stateful value transformation).
+     * This is a stateless record-by-record operation (cf. {@link 
#processValues(FixedKeyProcessorSupplier, String...)}
+     * for stateful value processing).
      * <p>
      * The example below splits input records {@code <Integer:String>}, with 
key=1, containing sentences as values
      * into their words.
@@ -670,8 +659,7 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+     * @see #processValues(FixedKeyProcessorSupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
      * @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
      */
@@ -2984,453 +2972,6 @@ public interface KStream<K, V> {
                                          final KeyValueMapper<? super K, ? 
super V, ? extends GK> keySelector,
                                          final ValueJoinerWithKey<? super K, ? 
super V, ? super GV, ? extends RV> valueJoiner,
                                          final Named named);
-
-    /**
-     * Transform the value of each input record into a new value (with 
possibly a new type) of the output record.
-     * A {@link ValueTransformer} (provided by the given {@link 
ValueTransformerSupplier}) is applied to each input
-     * record value and computes a new value for it.
-     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
-     * Attaching a state store makes this a stateful record-by-record 
operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
-     * If you choose not to attach one, this operation is similar to the 
stateless {@link #mapValues(ValueMapper) mapValues()}
-     * but allows access to the {@code ProcessorContext} and record metadata.
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress
-     * can be observed and additional periodic actions can be performed.
-     * <p>
-     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
-     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
-     * access to global state stores is available by default).
-     * <p>
-     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
-     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
-     * <pre>{@code
-     * // create store
-     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
-     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new 
ValueTransformerSupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }</pre>
-     * The second strategy is for the given {@link ValueTransformerSupplier} 
to implement {@link ConnectedStoreProvider#stores()},
-     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the transformer.
-     * <pre>{@code
-     * class MyValueTransformerSupplier implements ValueTransformerSupplier {
-     *     // supply transformer
-     *     ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the 
associated transformer
-     *     // the store name from the builder ("myValueTransformState") is 
used to access the store later via the ProcessorContext
-     *     Set<StoreBuilder> stores() {
-     *         StoreBuilder<KeyValueStore<String, String>> 
keyValueStoreBuilder =
-     *                   
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new 
MyValueTransformerSupplier());
-     * }</pre>
-     * <p>
-     * With either strategy, within the {@link ValueTransformer}, the state is 
obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
-     * The {@link ValueTransformer} must return the new value in {@link 
ValueTransformer#transform(Object) transform()}.
-     * No additional {@link KeyValue} pairs can be emitted via
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
-     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformer} tries to
-     * emit a {@link KeyValue} pair.
-     * <pre>{@code
-     * class MyValueTransformer implements ValueTransformer {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(V value) {
-     *         // can access this.state
-     *         return new NewValueType(); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }</pre>
-     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #repartition()} should 
be performed before
-     * {@code transformValues()}.
-     * <p>
-     * Setting a new value preserves data co-location with respect to the key.
-     * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}.
-     *
-     * @param valueTransformerSupplier an instance of {@link 
ValueTransformerSupplier} that generates a newly constructed {@link 
ValueTransformer}
-     *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformer} object
-     *                                 and returning the same object reference 
in {@link ValueTransformer} is a
-     *                                 violation of the supplier pattern and 
leads to runtime exceptions.
-     * @param stateStoreNames          the names of the state stores used by 
the processor; not required if the supplier
-     *                                 implements {@link 
ConnectedStoreProvider#stores()}
-     * @param <VR>                     the value type of the result stream
-     * @return a {@code KStream} that contains records with unmodified key and 
new values (possibly of different type)
-     * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
-     * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
-     */
-    @Deprecated
-    <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super 
V, ? extends VR> valueTransformerSupplier,
-                                        final String... stateStoreNames);
-    /**
-     * Transform the value of each input record into a new value (with 
possibly a new type) of the output record.
-     * A {@link ValueTransformer} (provided by the given {@link 
ValueTransformerSupplier}) is applied to each input
-     * record value and computes a new value for it.
-     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
-     * Attaching a state store makes this a stateful record-by-record 
operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
-     * If you choose not to attach one, this operation is similar to the 
stateless {@link #mapValues(ValueMapper) mapValues()}
-     * but allows access to the {@code ProcessorContext} and record metadata.
-     * This is essentially mixing the Processor API into the DSL, and provides 
all the functionality of the PAPI.
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress
-     * can be observed and additional periodic actions can be performed.
-     * <p>
-     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
-     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
-     * access to global state stores is available by default).
-     * <p>
-     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
-     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
-     * <pre>{@code
-     * // create store
-     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
-     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new 
ValueTransformerSupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }</pre>
-     * The second strategy is for the given {@link ValueTransformerSupplier} 
to implement {@link ConnectedStoreProvider#stores()},
-     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the transformer.
-     * <pre>{@code
-     * class MyValueTransformerSupplier implements ValueTransformerSupplier {
-     *     // supply transformer
-     *     ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the 
associated transformer
-     *     // the store name from the builder ("myValueTransformState") is 
used to access the store later via the ProcessorContext
-     *     Set<StoreBuilder> stores() {
-     *         StoreBuilder<KeyValueStore<String, String>> 
keyValueStoreBuilder =
-     *                   
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new 
MyValueTransformerSupplier());
-     * }</pre>
-     * <p>
-     * With either strategy, within the {@link ValueTransformer}, the state is 
obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
-     * The {@link ValueTransformer} must return the new value in {@link 
ValueTransformer#transform(Object) transform()}.
-     * No additional {@link KeyValue} pairs can be emitted via
-     * pairs can be emitted via
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
-     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformer} tries to
-     * emit a {@link KeyValue} pair.
-     * <pre>{@code
-     * class MyValueTransformer implements ValueTransformer {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(V value) {
-     *         // can access this.state
-     *         return new NewValueType(); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }</pre>
-     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #repartition()} should 
be performed before
-     * {@code transformValues()}.
-     * <p>
-     * Setting a new value preserves data co-location with respect to the key.
-     * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}.
-     *
-     * @param valueTransformerSupplier an instance of {@link 
ValueTransformerSupplier} that generates a newly constructed {@link 
ValueTransformer}
-     *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformer} object
-     *                                 and returning the same object reference 
in {@link ValueTransformer} is a
-     *                                 violation of the supplier pattern and 
leads to runtime exceptions.
-     * @param named                    a {@link Named} config used to name the 
processor in the topology
-     * @param stateStoreNames          the names of the state stores used by 
the processor; not required if the supplier
-     *                                 implements {@link 
ConnectedStoreProvider#stores()}
-     * @param <VR>                     the value type of the result stream
-     * @return a {@code KStream} that contains records with unmodified key and 
new values (possibly of different type)
-     * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
-     * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
-     */
-    @Deprecated
-    <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super 
V, ? extends VR> valueTransformerSupplier,
-                                        final Named named,
-                                        final String... stateStoreNames);
-
-    /**
-     * Transform the value of each input record into a new value (with 
possibly a new type) of the output record.
-     * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applied to
-     * each input record value and computes a new value for it.
-     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
-     * Attaching a state store makes this a stateful record-by-record 
operation (cf. {@link #mapValues(ValueMapperWithKey) mapValues()}).
-     * If you choose not to attach one, this operation is similar to the 
stateless {@link #mapValues(ValueMapperWithKey) mapValues()}
-     * but allows access to the {@code ProcessorContext} and record metadata.
-     * This is essentially mixing the Processor API into the DSL, and provides 
all the functionality of the PAPI.
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress
-     * can be observed and additional periodic actions can be performed.
-     * <p>
-     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
-     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
-     * access to global state stores is available by default).
-     * <p>
-     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
-     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
-     * <pre>{@code
-     * // create store
-     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
-     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new 
ValueTransformerWithKeySupplier() {
-     *     public ValueTransformer get() {
-     *         return new MyValueTransformer();
-     *     }
-     * }, "myValueTransformState");
-     * }</pre>
-     * The second strategy is for the given {@link 
ValueTransformerWithKeySupplier} to implement {@link 
ConnectedStoreProvider#stores()},
-     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the transformer.
-     * <pre>{@code
-     * class MyValueTransformerWithKeySupplier implements 
ValueTransformerWithKeySupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the 
associated transformer
-     *     // the store name from the builder ("myValueTransformState") is 
used to access the store later via the ProcessorContext
-     *     Set<StoreBuilder> stores() {
-     *         StoreBuilder<KeyValueStore<String, String>> 
keyValueStoreBuilder =
-     *                   
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new 
MyValueTransformerWithKeySupplier());
-     * }</pre>
-     * <p>
-     * With either strategy, within the {@link ValueTransformerWithKey}, the 
state is obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
-     * The {@link ValueTransformerWithKey} must return the new value in
-     * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
-     * No additional {@link KeyValue} pairs can be emitted via
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
-     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformerWithKey} tries
-     * to emit a {@link KeyValue} pair.
-     * <pre>{@code
-     * class MyValueTransformerWithKey implements ValueTransformerWithKey {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(K readOnlyKey, V value) {
-     *         // can access this.state and use read-only key
-     *         return new NewValueType(readOnlyKey); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }</pre>
-     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #repartition()} should 
be performed before
-     * {@code transformValues()}.
-     * <p>
-     * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
-     * So, setting a new value preserves data co-location with respect to the 
key.
-     * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}.
-     *
-     * @param valueTransformerSupplier an instance of {@link 
ValueTransformerWithKeySupplier} that generates a newly constructed {@link 
ValueTransformerWithKey}
-     *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformerWithKey} object
-     *                                 and returning the same object reference 
in {@link ValueTransformerWithKey} is a
-     *                                 violation of the supplier pattern and 
leads to runtime exceptions.
-     * @param stateStoreNames          the names of the state stores used by 
the processor; not required if the supplier
-     *                                 implements {@link 
ConnectedStoreProvider#stores()}
-     * @param <VR>                     the value type of the result stream
-     * @return a {@code KStream} that contains records with unmodified key and 
new values (possibly of different type)
-     * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
-     * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
-     */
-    @Deprecated
-    <VR> KStream<K, VR> transformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
valueTransformerSupplier,
-                                        final String... stateStoreNames);
-
-    /**
-     * Transform the value of each input record into a new value (with 
possibly a new type) of the output record.
-     * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applied to
-     * each input record value and computes a new value for it.
-     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
-     * Attaching a state store makes this a stateful record-by-record 
operation (cf. {@link #mapValues(ValueMapperWithKey) mapValues()}).
-     * If you choose not to attach one, this operation is similar to the 
stateless {@link #mapValues(ValueMapperWithKey) mapValues()}
-     * but allows access to the {@code ProcessorContext} and record metadata.
-     * This is essentially mixing the Processor API into the DSL, and provides 
all the functionality of the PAPI.
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress
-     * can be observed and additional periodic actions can be performed.
-     * <p>
-     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
-     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
-     * access to global state stores is available by default).
-     * <p>
-     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
-     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
-     * <pre>{@code
-     * // create store
-     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
-     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transformValues(new 
ValueTransformerWithKeySupplier() {
-     *     public ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     * }, "myValueTransformState");
-     * }</pre>
-     * The second strategy is for the given {@link 
ValueTransformerWithKeySupplier} to implement {@link 
ConnectedStoreProvider#stores()},
-     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the transformer.
-     * <pre>{@code
-     * class MyValueTransformerWithKeySupplier implements 
ValueTransformerWithKeySupplier {
-     *     // supply transformer
-     *     ValueTransformerWithKey get() {
-     *         return new MyValueTransformerWithKey();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the 
associated transformer
-     *     // the store name from the builder ("myValueTransformState") is 
used to access the store later via the ProcessorContext
-     *     Set<StoreBuilder> stores() {
-     *         StoreBuilder<KeyValueStore<String, String>> 
keyValueStoreBuilder =
-     *                   
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.transformValues(new 
MyValueTransformerWithKeySupplier());
-     * }</pre>
-     * <p>
-     * With either strategy, within the {@link ValueTransformerWithKey}, the 
state is obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
-     * The {@link ValueTransformerWithKey} must return the new value in
-     * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
-     * No additional {@link KeyValue} pairs can be emitted via
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
-     * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformerWithKey} tries
-     * to emit a {@link KeyValue} pair.
-     * <pre>{@code
-     * class MyValueTransformerWithKey implements ValueTransformerWithKey {
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.state = context.getStateStore("myValueTransformState");
-     *         // punctuate each second, can access this.state
-     *         context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     NewValueType transform(K readOnlyKey, V value) {
-     *         // can access this.state and use read-only key
-     *         return new NewValueType(readOnlyKey); // or null
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }</pre>
-     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #repartition()} should 
be performed before
-     * {@code transformValues()}.
-     * <p>
-     * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
-     * So, setting a new value preserves data co-location with respect to the 
key.
-     * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}.
-     *
-     * @param valueTransformerSupplier an instance of {@link 
ValueTransformerWithKeySupplier} that generates a newly constructed {@link 
ValueTransformerWithKey}
-     *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformerWithKey} object
-     *                                 and returning the same object reference 
in {@link ValueTransformerWithKey} is a
-     *                                 violation of the supplier pattern and 
leads to runtime exceptions.
-     * @param named                    a {@link Named} config used to name the 
processor in the topology
-     * @param stateStoreNames          the names of the state stores used by 
the processor; not required if the supplier
-     *                                 implements {@link 
ConnectedStoreProvider#stores()}
-     * @param <VR>                     the value type of the result stream
-     * @return a {@code KStream} that contains records with unmodified key and 
new values (possibly of different type)
-     * @see #mapValues(ValueMapper)
-     * @see #mapValues(ValueMapperWithKey)
-     * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
-     */
-    @Deprecated
-    <VR> KStream<K, VR> transformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
valueTransformerSupplier,
-                                        final Named named,
-                                        final String... stateStoreNames);
     /**
      * Transform the value of each input record into zero or more new values 
(with possibly a new
      * type) and emit for each new value a record with the same key of the 
input record and the value.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
index 228b1d71231..222cdc1bbc2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -35,7 +35,6 @@ import java.util.function.Supplier;
  * @see Transformer
  * @see ValueTransformer
  * @see ValueTransformerSupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
  * @deprecated Since 4.0. Use {@link 
org.apache.kafka.streams.processor.api.ProcessorSupplier api.ProcessorSupplier} 
instead.
  */
 @Deprecated
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 6d4e4fe8f1c..ae1d21334ca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -44,8 +44,7 @@ import java.time.Duration;
  * @param <VR> transformed value type
  * @see ValueTransformerSupplier
  * @see ValueTransformerWithKeySupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see KTable#transformValues(ValueTransformerWithKeySupplier, Materialized, 
String...)
  * @see Transformer
  * @deprecated Since 4.0. Use {@link FixedKeyProcessor} instead.
  */
@@ -77,7 +76,7 @@ public interface ValueTransformer<V, VR> {
 
     /**
      * Transform the given value to a new value.
-     * Additionally, any {@link StateStore} that is {@link 
KStream#transformValues(ValueTransformerSupplier, String...)
+     * Additionally, any {@link StateStore} that is {@link 
KTable#transformValues(ValueTransformerWithKeySupplier, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index b0008744eac..6a4c25b0c1c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -31,8 +31,6 @@ import 
org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
  * @see ValueTransformer
  * @see ValueTransformerWithKey
  * @see ValueTransformerWithKeySupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see Transformer
  * @see TransformerSupplier
  * @deprecated Since 4.0. Use {@link FixedKeyProcessorSupplier} instead.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 9c3552622ad..cc0c38d01ef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -47,8 +47,7 @@ import java.time.Duration;
  * @param <VR> transformed value type
  * @see ValueTransformer
  * @see ValueTransformerWithKeySupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see KTable#transformValues(ValueTransformerWithKeySupplier, String...) 
  * @see Transformer
  */
 
@@ -77,7 +76,7 @@ public interface ValueTransformerWithKey<K, V, VR> {
 
     /**
      * Transform the given [key and] value to a new value.
-     * Additionally, any {@link StateStore} that is {@link 
KStream#transformValues(ValueTransformerWithKeySupplier, String...)
+     * Additionally, any {@link StateStore} that is {@link 
KTable#transformValues(ValueTransformerWithKeySupplier, Named, String...)
      * attached} to this operator can be accessed and modified arbitrarily (cf.
      * {@link ProcessorContext#getStateStore(String)}).
      * <p>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
index 1c0feb0015e..8b1e995f1c3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
@@ -32,8 +32,6 @@ import java.util.function.Supplier;
  * @param <VR> transformed value type
  * @see ValueTransformer
  * @see ValueTransformerWithKey
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see Transformer
  * @see TransformerSupplier
  */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a23c5ad4b0b..ec2fd211efb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -121,8 +121,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> 
implements KStream<K
 
     private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
 
-    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
-
     private static final String TRANSFORMVALUES_NAME = 
"KSTREAM-TRANSFORMVALUES-";
 
     private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
@@ -1209,77 +1207,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             builder);
     }
 
-    @Override
-    @Deprecated
-    public <VR> KStream<K, VR> transformValues(final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends 
VR> valueTransformerSupplier,
-                                               final String... 
stateStoreNames) {
-        Objects.requireNonNull(valueTransformerSupplier, 
"valueTransformerSupplier can't be null");
-        return doTransformValues(
-            toValueTransformerWithKeySupplier(valueTransformerSupplier),
-            NamedInternal.empty(),
-            stateStoreNames);
-    }
-
-    @Override
-    @Deprecated
-    public <VR> KStream<K, VR> transformValues(final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends 
VR> valueTransformerSupplier,
-                                               final Named named,
-                                               final String... 
stateStoreNames) {
-        Objects.requireNonNull(valueTransformerSupplier, 
"valueTransformerSupplier can't be null");
-        Objects.requireNonNull(named, "named can't be null");
-        return doTransformValues(
-            toValueTransformerWithKeySupplier(valueTransformerSupplier),
-            new NamedInternal(named),
-            stateStoreNames);
-    }
-
-    @Override
-    @Deprecated
-    public <VR> KStream<K, VR> transformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
valueTransformerSupplier,
-                                               final String... 
stateStoreNames) {
-        Objects.requireNonNull(valueTransformerSupplier, 
"valueTransformerSupplier can't be null");
-        return doTransformValues(valueTransformerSupplier, 
NamedInternal.empty(), stateStoreNames);
-    }
-
-    @Override
-    @Deprecated
-    public <VR> KStream<K, VR> transformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
valueTransformerSupplier,
-                                               final Named named,
-                                               final String... 
stateStoreNames) {
-        Objects.requireNonNull(valueTransformerSupplier, 
"valueTransformerSupplier can't be null");
-        Objects.requireNonNull(named, "named can't be null");
-        return doTransformValues(valueTransformerSupplier, new 
NamedInternal(named), stateStoreNames);
-    }
-
-    private <VR> KStream<K, VR> doTransformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
valueTransformerWithKeySupplier,
-                                                  final NamedInternal named,
-                                                  final String... 
stateStoreNames) {
-        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a 
null array");
-        for (final String stateStoreName : stateStoreNames) {
-            Objects.requireNonNull(stateStoreName, "stateStoreNames can't 
contain `null` as store name");
-        }
-        ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
-
-        final String name = named.orElseGenerateWithPrefix(builder, 
TRANSFORMVALUES_NAME);
-        final StatefulProcessorNode<? super K, ? super V> transformNode = new 
StatefulProcessorNode<>(
-            name,
-            new ProcessorParameters<>(new 
KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
-            stateStoreNames);
-        transformNode.setValueChangingOperation(true);
-
-        builder.addGraphNode(graphNode, transformNode);
-
-        // cannot inherit value serde
-        return new KStreamImpl<>(
-            name,
-            keySerde,
-            null,
-            subTopologySourceNodes,
-            repartitionRequired,
-            transformNode,
-            builder);
-    }
-
     @Override
     @Deprecated
     public <VR> KStream<K, VR> flatTransformValues(final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, 
Iterable<VR>> valueTransformerSupplier,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
deleted file mode 100644
index 1b767ef3969..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
-import 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.state.StoreBuilder;
-
-import java.util.Set;
-
-public class KStreamTransformValues<KIn, VIn, VOut> implements 
ProcessorSupplier<KIn, VIn, KIn, VOut> {
-
-    private final ValueTransformerWithKeySupplier<KIn, VIn, VOut> 
valueTransformerSupplier;
-
-    KStreamTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn, 
VOut> valueTransformerSupplier) {
-        this.valueTransformerSupplier = valueTransformerSupplier;
-    }
-
-    @Override
-    public Processor<KIn, VIn, KIn, VOut> get() {
-        return new 
KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
-    }
-
-    @Override
-    public Set<StoreBuilder<?>> stores() {
-        return valueTransformerSupplier.stores();
-    }
-
-    public static class KStreamTransformValuesProcessor<KIn, VIn, VOut> 
extends ContextualProcessor<KIn, VIn, KIn, VOut> {
-
-        private final ValueTransformerWithKey<KIn, VIn, VOut> valueTransformer;
-
-        KStreamTransformValuesProcessor(final ValueTransformerWithKey<KIn, 
VIn, VOut> valueTransformer) {
-            this.valueTransformer = valueTransformer;
-        }
-
-        @Override
-        public void init(final ProcessorContext<KIn, VOut> context) {
-            super.init(context);
-            valueTransformer.init(new 
ForwardingDisabledProcessorContext((InternalProcessorContext<KIn, VOut>) 
context));
-        }
-
-        @Override
-        public void process(final Record<KIn, VIn> record) {
-            
context().forward(record.withValue(valueTransformer.transform(record.key(), 
record.value())));
-        }
-
-        @Override
-        public void close() {
-            valueTransformer.close();
-        }
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
index 91824d5a5b8..108a7d7233b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
 import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.util.Set;
@@ -91,10 +92,7 @@ import java.util.Set;
  * @see Topology#addProcessor(String, 
org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
  * @see 
KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, 
String...)
  * @see 
KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, 
Named, String...)
- * @see 
KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
 String...)
- * @see 
KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
 Named, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, 
String...)
+ * @see KStream#processValues(FixedKeyProcessorSupplier, String...) 
  * @see 
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
 String...)
  * @see 
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
 Named, String...)
  * @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index aa646e873cf..6f79aee9dbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -845,24 +845,6 @@ public class StreamsBuilderTest {
         assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", 
STREAM_OPERATION_NAME);
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldUseSpecifiedNameForTransformValues() {
-        builder.stream(STREAM_TOPIC).transformValues(() -> new 
NoopValueTransformer<>(), Named.as(STREAM_OPERATION_NAME));
-        builder.build();
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).buildTopology();
-        assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", 
STREAM_OPERATION_NAME);
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldUseSpecifiedNameForTransformValuesWithKey() {
-        builder.stream(STREAM_TOPIC).transformValues(() -> new 
NoopValueTransformerWithKey<>(), Named.as(STREAM_OPERATION_NAME));
-        builder.build();
-        final ProcessorTopology topology = 
builder.internalTopologyBuilder.rewriteTopology(new 
StreamsConfig(props)).buildTopology();
-        assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", 
STREAM_OPERATION_NAME);
-    }
-
     @Test
     public void shouldUseSpecifiedNameForSplitOperation() {
         builder.stream(STREAM_TOPIC)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 226fc357a6a..b78696f259a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -114,33 +114,6 @@ public class KStreamImplTest {
     private final MockApiProcessorSupplier<String, String, Void, Void> 
processorSupplier = new MockApiProcessorSupplier<>();
     private final MockApiFixedKeyProcessorSupplier<String, String, Void> 
fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
     @SuppressWarnings("deprecation")
-    private final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, String> 
valueTransformerSupplier =
-        () -> new org.apache.kafka.streams.kstream.ValueTransformer<String, 
String>() {
-            @Override
-            public void init(final ProcessorContext context) {}
-
-            @Override
-            public String transform(final String value) {
-                return value;
-            }
-
-            @Override
-            public void close() {}
-        };
-    private final ValueTransformerWithKeySupplier<String, String, String> 
valueTransformerWithKeySupplier =
-        () -> new ValueTransformerWithKey<String, String, String>() {
-            @Override
-            public void init(final ProcessorContext context) {}
-
-            @Override
-            public String transform(final String key, final String value) {
-                return value;
-            }
-
-            @Override
-            public void close() {}
-        };
-    @SuppressWarnings("deprecation")
     private final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, 
Iterable<String>> flatValueTransformerSupplier =
         () -> new org.apache.kafka.streams.kstream.ValueTransformer<String, 
Iterable<String>>() {
             @Override
@@ -1213,9 +1186,6 @@ public class KStreamImplTest {
         assertEquals(((AbstractStream) 
stream1.flatMapValues(flatMapper)).keySerde(), consumedInternal.keySerde());
         assertNull(((AbstractStream) 
stream1.flatMapValues(flatMapper)).valueSerde());
 
-        assertEquals(((AbstractStream) 
stream1.transformValues(valueTransformerSupplier)).keySerde(), 
consumedInternal.keySerde());
-        assertNull(((AbstractStream) 
stream1.transformValues(valueTransformerSupplier)).valueSerde());
-
         assertNull(((AbstractStream) stream1.merge(stream1)).keySerde());
         assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde());
 
@@ -1589,7 +1559,7 @@ public class KStreamImplTest {
             processorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
                 IllegalArgumentException.class,
-            () -> testStream.process(() -> processor, 
Named.as("flatTransformer"))
+            () -> testStream.process(() -> processor, Named.as("processor"))
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
@@ -1606,272 +1576,49 @@ public class KStreamImplTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowBadTransformerSupplierOnTransformValues() {
-        final org.apache.kafka.streams.kstream.ValueTransformer<String, 
String> transformer = valueTransformerSupplier.get();
+    public void shouldNotAllowBadProcessSupplierOnProcessValues() {
+        final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String, 
String, Void> processor =
+            fixedKeyProcessorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
-                IllegalArgumentException.class,
-            () -> testStream.transformValues(() -> transformer)
+            IllegalArgumentException.class,
+            () -> testStream.processValues(() -> processor)
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowBadTransformerSupplierOnTransformValuesWithNamed() {
-        final org.apache.kafka.streams.kstream.ValueTransformer<String, 
String> transformer = valueTransformerSupplier.get();
+    public void shouldNotAllowBadProcessSupplierOnProcessValuesWithStores() {
+        final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String, 
String, Void> processor =
+            fixedKeyProcessorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
-                IllegalArgumentException.class,
-            () -> testStream.transformValues(() -> transformer, 
Named.as("transformer"))
+            IllegalArgumentException.class,
+            () -> testStream.processValues(() -> processor, "storeName")
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> 
testStream.transformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object,
 Object>) null));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValues() {
-        final ValueTransformerWithKey<String, String, String> transformer = 
valueTransformerWithKeySupplier.get();
+    public void shouldNotAllowBadProcessSupplierOnProcessValuesWithNamed() {
+        final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String, 
String, Void> processor =
+            fixedKeyProcessorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
-                IllegalArgumentException.class,
-            () -> testStream.transformValues(() -> transformer)
+            IllegalArgumentException.class,
+            () -> testStream.processValues(() -> processor, 
Named.as("processor"))
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValuesWithNamed() {
-        final ValueTransformerWithKey<String, String, String> transformer = 
valueTransformerWithKeySupplier.get();
+    public void 
shouldNotAllowBadProcessSupplierOnProcessValuesWithNamedAndStores() {
+        final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String, 
String, Void> processor =
+            fixedKeyProcessorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
-                IllegalArgumentException.class,
-            () -> testStream.transformValues(() -> transformer, 
Named.as("transformer"))
+            IllegalArgumentException.class,
+            () -> testStream.processValues(() -> processor, 
Named.as("processor"), "storeName")
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> 
testStream.transformValues((ValueTransformerWithKeySupplier<Object, Object, 
Object>) null));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithStores() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>) 
null,
-                "storeName"));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithStores() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                (ValueTransformerWithKeySupplier<Object, Object, Object>) null,
-                "storeName"));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamed() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>) 
null,
-                Named.as("valueTransformer")));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamed() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                (ValueTransformerWithKeySupplier<Object, Object, Object>) null,
-                Named.as("valueTransformerWithKey")));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamedAndStores() 
{
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>) 
null,
-                Named.as("valueTransformer"),
-                "storeName"));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamedAndStores()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                (ValueTransformerWithKeySupplier<Object, Object, Object>) null,
-                Named.as("valueTransformerWithKey"),
-                "storeName"));
-        assertThat(exception.getMessage(), equalTo("valueTransformerSupplier 
can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplier() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerSupplier,
-                (String[]) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a 
null array"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplier()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerWithKeySupplier,
-                (String[]) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a 
null array"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplier() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerSupplier, (String) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't 
contain `null` as store name"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplier()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerWithKeySupplier,
-                (String) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't 
contain `null` as store name"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplierWithNamed()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerSupplier,
-                Named.as("valueTransformer"),
-                (String[]) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a 
null array"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplierWithNamed()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerWithKeySupplier,
-                Named.as("valueTransformer"),
-                (String[]) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a 
null array"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplierWithNamed()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerSupplier,
-                Named.as("valueTransformer"),
-                (String) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't 
contain `null` as store name"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplierWithName()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerWithKeySupplier,
-                Named.as("valueTransformerWithKey"),
-                (String) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't 
contain `null` as store name"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplier() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerSupplier,
-                (Named) null));
-        assertThat(exception.getMessage(), equalTo("named can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplier() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerWithKeySupplier,
-                (Named) null));
-        assertThat(exception.getMessage(), equalTo("named can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplierAndStores() 
{
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerSupplier,
-                (Named) null,
-                "storeName"));
-        assertThat(exception.getMessage(), equalTo("named can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplierAndStores()
 {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.transformValues(
-                valueTransformerWithKeySupplier,
-                (Named) null,
-                "storeName"));
-        assertThat(exception.getMessage(), equalTo("named can't be null"));
-    }
-
     @Test
     @SuppressWarnings("deprecation")
     public void 
shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
deleted file mode 100644
index 5cb51ae7beb..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValueTimestamp;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.api.Processor;
-import 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
-import org.apache.kafka.test.StreamsTestUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import java.util.Properties;
-
-import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.mockito.Mockito.mock;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class KStreamTransformValuesTest {
-    private final String topicName = "topic";
-    private final MockProcessorSupplier<Integer, Integer, Void, Void> supplier 
= new MockProcessorSupplier<>();
-    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
-    private InternalProcessorContext context = 
mock(InternalProcessorContext.class);
-
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    @Test
-    public void testTransform() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<Number, Integer> 
valueTransformerSupplier =
-            () -> new 
org.apache.kafka.streams.kstream.ValueTransformer<Number, Integer>() {
-                private int total = 0;
-
-                @Override
-                public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) { }
-
-                @Override
-                public Integer transform(final Number value) {
-                    total += value.intValue();
-                    return total;
-                }
-
-                @Override
-                public void close() { }
-            };
-
-        final int[] expectedKeys = {1, 10, 100, 1000};
-
-        final KStream<Integer, Integer> stream;
-        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), 
Serdes.Integer()));
-        stream.transformValues(valueTransformerSupplier).process(supplier);
-
-        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
-            for (final int expectedKey : expectedKeys) {
-                final TestInputTopic<Integer, Integer> inputTopic =
-                        driver.createInputTopic(topicName, new 
IntegerSerializer(), new IntegerSerializer());
-                inputTopic.pipeInput(expectedKey, expectedKey * 10, 
expectedKey / 2L);
-            }
-        }
-        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 10, 
0),
-            new KeyValueTimestamp<>(10, 110, 5),
-            new KeyValueTimestamp<>(100, 1110, 50),
-            new KeyValueTimestamp<>(1000, 11110, 500)};
-
-        assertArrayEquals(expected, 
supplier.theCapturedProcessor().processed().toArray());
-    }
-
-    @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
-    @Test
-    public void testTransformWithKey() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final ValueTransformerWithKeySupplier<Integer, Number, Integer> 
valueTransformerSupplier =
-            () -> new ValueTransformerWithKey<Integer, Number, Integer>() {
-                private int total = 0;
-
-                @Override
-                public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) { }
-
-                @Override
-                public Integer transform(final Integer readOnlyKey, final 
Number value) {
-                    total += value.intValue() + readOnlyKey;
-                    return total;
-                }
-
-                @Override
-                public void close() { }
-            };
-
-        final int[] expectedKeys = {1, 10, 100, 1000};
-
-        final KStream<Integer, Integer> stream;
-        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), 
Serdes.Integer()));
-        stream.transformValues(valueTransformerSupplier).process(supplier);
-
-        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
-            final TestInputTopic<Integer, Integer> inputTopic =
-                    driver.createInputTopic(topicName, new 
IntegerSerializer(), new IntegerSerializer());
-            for (final int expectedKey : expectedKeys) {
-                inputTopic.pipeInput(expectedKey, expectedKey * 10, 
expectedKey / 2L);
-            }
-        }
-        final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 11, 
0),
-            new KeyValueTimestamp<>(10, 121, 5),
-            new KeyValueTimestamp<>(100, 1221, 50),
-            new KeyValueTimestamp<>(1000, 12221, 500)};
-
-        assertArrayEquals(expected, 
supplier.theCapturedProcessor().processed().toArray());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void 
shouldInitializeTransformerWithForwardDisabledProcessorContext() {
-        final NoOpValueTransformerWithKeySupplier<String, String> transformer 
= new NoOpValueTransformerWithKeySupplier<>();
-        final KStreamTransformValues<String, String, String> transformValues = 
new KStreamTransformValues<>(transformer);
-        final Processor<String, String, String, String> processor = 
transformValues.get();
-
-        processor.init(context);
-
-        assertThat(transformer.context, isA((Class) 
ForwardingDisabledProcessorContext.class));
-    }
-}
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 5e6cc4f3f22..80d43fc2315 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -584,94 +584,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   ): KStream[K, VR] =
     new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava, 
named, stateStoreNames: _*))
 
-  /**
-   * Transform the value of each input record into a new value (with possible 
new type) of the output record.
-   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) 
is applied to each input
-   * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and added via 
`addStateStore` before they can be connected
-   * to the `ValueTransformer`.
-   * It's not required to connect global state stores that are added via 
`addGlobalStore`;
-   * read-only access to global state stores is available by default.
-   *
-   * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` 
that generates a `ValueTransformer`
-   * @param stateStoreNames          the names of the state stores used by the 
processor
-   * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
-   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-   */
-  @deprecated(since = "3.3", message = "Use 
processValues(FixedKeyProcessorSupplier, String*) instead.")
-  def transformValues[VR](
-    valueTransformerSupplier: ValueTransformerSupplier[V, VR],
-    stateStoreNames: String*
-  ): KStream[K, VR] =
-    new KStream(inner.transformValues[VR](valueTransformerSupplier, 
stateStoreNames: _*))
-
-  /**
-   * Transform the value of each input record into a new value (with possible 
new type) of the output record.
-   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) 
is applied to each input
-   * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and added via 
`addStateStore` before they can be connected
-   * to the `ValueTransformer`.
-   * It's not required to connect global state stores that are added via 
`addGlobalStore`;
-   * read-only access to global state stores is available by default.
-   *
-   * @param valueTransformerSupplier a instance of `ValueTransformerSupplier` 
that generates a `ValueTransformer`
-   * @param named                    a [[Named]] config used to name the 
processor in the topology
-   * @param stateStoreNames          the names of the state stores used by the 
processor
-   * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
-   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-   */
-  @deprecated(since = "3.3", message = "Use 
processValues(FixedKeyProcessorSupplier, Named, String*) instead.")
-  def transformValues[VR](
-    valueTransformerSupplier: ValueTransformerSupplier[V, VR],
-    named: Named,
-    stateStoreNames: String*
-  ): KStream[K, VR] =
-    new KStream(inner.transformValues[VR](valueTransformerSupplier, named, 
stateStoreNames: _*))
-
-  /**
-   * Transform the value of each input record into a new value (with possible 
new type) of the output record.
-   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) 
is applied to each input
-   * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and added via 
`addStateStore` before they can be connected
-   * to the `ValueTransformer`.
-   * It's not required to connect global state stores that are added via 
`addGlobalStore`;
-   * read-only access to global state stores is available by default.
-   *
-   * @param valueTransformerSupplier a instance of 
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
-   * @param stateStoreNames          the names of the state stores used by the 
processor
-   * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
-   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-   */
-  @deprecated(since = "3.3", message = "Use 
processValues(FixedKeyProcessorSupplier, String*) instead.")
-  def transformValues[VR](
-    valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
-    stateStoreNames: String*
-  ): KStream[K, VR] =
-    new KStream(inner.transformValues[VR](valueTransformerSupplier, 
stateStoreNames: _*))
-
-  /**
-   * Transform the value of each input record into a new value (with possible 
new type) of the output record.
-   * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) 
is applied to each input
-   * record value and computes a new value for it.
-   * In order to assign a state, the state must be created and added via 
`addStateStore` before they can be connected
-   * to the `ValueTransformer`.
-   * It's not required to connect global state stores that are added via 
`addGlobalStore`;
-   * read-only access to global state stores is available by default.
-   *
-   * @param valueTransformerSupplier a instance of 
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
-   * @param named                    a [[Named]] config used to name the 
processor in the topology
-   * @param stateStoreNames          the names of the state stores used by the 
processor
-   * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
-   * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
-   */
-  @deprecated(since = "3.3", message = "Use 
processValues(FixedKeyProcessorSupplier, Named, String*) instead.")
-  def transformValues[VR](
-    valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
-    named: Named,
-    stateStoreNames: String*
-  ): KStream[K, VR] =
-    new KStream(inner.transformValues[VR](valueTransformerSupplier, named, 
stateStoreNames: _*))
-
   /**
    * Process all records in this stream, one record at a time, by applying a 
`Processor` (provided by the given
    * `processorSupplier`).


Reply via email to