This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95650431dfd KAFKA-16339: [2/4 KStream#flatTransform] Remove Deprecated 
"transformer" methods and classes (#17245)
95650431dfd is described below

commit 95650431dfd2072cfd79efd5536d9eff7b50f3d9
Author: Joao Pedro Fonseca Dantas <[email protected]>
AuthorDate: Thu Nov 7 22:35:17 2024 -0300

    KAFKA-16339: [2/4 KStream#flatTransform] Remove Deprecated "transformer" 
methods and classes (#17245)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../org/apache/kafka/streams/kstream/KStream.java  | 297 +--------------------
 .../streams/kstream/internals/KStreamImpl.java     |  42 ---
 .../streams/processor/ConnectedStoreProvider.java  |   2 -
 .../streams/kstream/internals/KStreamImplTest.java | 136 ++--------
 .../kafka/streams/scala/kstream/KStream.scala      |  49 ----
 .../kafka/streams/scala/kstream/KStreamTest.scala  |  38 ---
 6 files changed, 25 insertions(+), 539 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a5679becc51..516ffb80227 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -436,7 +436,6 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -488,7 +487,6 @@ public interface KStream<K, V> {
      * @see #flatMapValues(ValueMapper)
      * @see #flatMapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -532,7 +530,6 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -576,7 +573,6 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -625,7 +621,6 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -675,7 +670,6 @@ public interface KStream<K, V> {
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
      * @see #process(ProcessorSupplier, String...)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @see #transformValues(ValueTransformerSupplier, String...)
      * @see #transformValues(ValueTransformerWithKeySupplier, String...)
      * @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -2989,266 +2983,6 @@ public interface KStream<K, V> {
                                          final ValueJoinerWithKey<? super K, ? 
super V, ? super GV, ? extends RV> valueJoiner,
                                          final Named named);
 
-    /**
-     * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
-     * can be altered arbitrarily).
-     * A {@link Transformer} (provided by the given {@link 
TransformerSupplier}) is applied to each input record and
-     * returns zero or more output records.
-     * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K':V'>, <K'':V''>, ...}.
-     * Attaching a state store makes this a stateful record-by-record 
operation (cf. {@link #flatMap(KeyValueMapper) flatMap()}).
-     * If you choose not to attach one, this operation is similar to the 
stateless {@link #flatMap(KeyValueMapper) flatMap()}
-     * but allows access to the {@code ProcessorContext} and record metadata.
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}
-     * the processing progress can be observed and additional periodic actions 
can be performed.
-     * <p>
-     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
-     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
-     * access to global state stores is available by default).
-     * <p>
-     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
-     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
-     * <pre>{@code
-     * // create store
-     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
-     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transform(new TransformerSupplier() {
-     *     public Transformer get() {
-     *         return new MyTransformer();
-     *     }
-     * }, "myTransformState");
-     * }</pre>
-     * The second strategy is for the given {@link TransformerSupplier} to 
implement {@link ConnectedStoreProvider#stores()},
-     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the transformer.
-     * <pre>{@code
-     * class MyTransformerSupplier implements TransformerSupplier {
-     *     // supply transformer
-     *     Transformer get() {
-     *         return new MyTransformer();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the 
associated transformer
-     *     // the store name from the builder ("myTransformState") is used to 
access the store later via the ProcessorContext
-     *     Set<StoreBuilder> stores() {
-     *         StoreBuilder<KeyValueStore<String, String>> 
keyValueStoreBuilder =
-     *                   
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.flatTransform(new 
MyTransformerSupplier());
-     * }</pre>
-     * <p>
-     * With either strategy, within the {@link Transformer}, the state is 
obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
-     * The {@link Transformer} must return an {@link java.lang.Iterable} type 
(e.g., any {@link java.util.Collection}
-     * type) in {@link Transformer#transform(Object, Object) transform()}.
-     * The return value of {@link Transformer#transform(Object, Object) 
Transformer#transform()} may be {@code null},
-     * which is equal to returning an empty {@link java.lang.Iterable 
Iterable}, i.e., no records are emitted.
-     * <pre>{@code
-     * class MyTransformer implements Transformer {
-     *     private ProcessorContext context;
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.context = context;
-     *         this.state = context.getStateStore("myTransformState");
-     *         // punctuate each second; can access this.state
-     *         context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     Iterable<KeyValue> transform(K key, V value) {
-     *         // can access this.state
-     *         List<KeyValue> result = new ArrayList<>();
-     *         for (int i = 0; i < 3; i++) {
-     *             result.add(new KeyValue(key, value));
-     *         }
-     *         return result; // emits a list of key-value pairs via return
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }</pre>
-     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #repartition()} should 
be performed before
-     * {@code flatTransform()}.
-     * <p>
-     * Transforming records might result in an internal data redistribution if 
a key based operator (like an aggregation
-     * or join) is applied to the result {@code KStream}.
-     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) 
transformValues()})
-     * <p>
-     * Note that it is possible to emit records by using
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
-     * context#forward()} in {@link Transformer#transform(Object, Object) 
Transformer#transform()} and
-     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
-     * Be aware that a mismatch between the types of the emitted records and 
the type of the stream would only be
-     * detected at runtime.
-     * To ensure type-safety at compile-time,
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
context#forward()} should
-     * not be used in {@link Transformer#transform(Object, Object) 
Transformer#transform()} and
-     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
-     * The supplier should always generate a new instance each time {@link 
TransformerSupplier#get()} gets called. Creating
-     * a single {@link Transformer} object and returning the same object 
reference in {@link TransformerSupplier#get()} would be
-     * a violation of the supplier pattern and leads to runtime exceptions.
-     *
-     * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a newly constructed {@link Transformer}
-     * @param stateStoreNames     the names of the state stores used by the 
processor; not required if the supplier
-     *                            implements {@link 
ConnectedStoreProvider#stores()}
-     * @param <K1>                the key type of the new stream
-     * @param <V1>                the value type of the new stream
-     * @return a {@code KStream} that contains more or less records with new 
key and value (possibly of different type)
-     * @see #flatMap(KeyValueMapper)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
-     * @see #process(ProcessorSupplier, String...)
-     * @deprecated Since 3.3. Use {@link KStream#process(ProcessorSupplier, 
String...)} instead.
-     */
-    @Deprecated
-    <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super 
K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
-                                           final String... stateStoreNames);
-
-    /**
-     * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
-     * can be altered arbitrarily).
-     * A {@link Transformer} (provided by the given {@link 
TransformerSupplier}) is applied to each input record and
-     * returns zero or more output records.
-     * Thus, an input record {@code <K,V>} can be transformed into output 
records {@code <K':V'>, <K'':V''>, ...}.
-     * Attaching a state store makes this a stateful record-by-record 
operation (cf. {@link #flatMap(KeyValueMapper) flatMap()}).
-     * If you choose not to attach one, this operation is similar to the 
stateless {@link #flatMap(KeyValueMapper) flatMap()}
-     * but allows access to the {@code ProcessorContext} and record metadata.
-     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}
-     * the processing progress can be observed and additional periodic actions 
can be performed.
-     * <p>
-     * In order for the transformer to use state stores, the stores must be 
added to the topology and connected to the
-     * transformer using at least one of two strategies (though it's not 
required to connect global state stores; read-only
-     * access to global state stores is available by default).
-     * <p>
-     * The first strategy is to manually add the {@link StoreBuilder}s via 
{@link Topology#addStateStore(StoreBuilder, String...)},
-     * and specify the store names via {@code stateStoreNames} so they will be 
connected to the transformer.
-     * <pre>{@code
-     * // create store
-     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
-     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
-     *                 Serdes.String(),
-     *                 Serdes.String());
-     * // add store
-     * builder.addStateStore(keyValueStoreBuilder);
-     *
-     * KStream outputStream = inputStream.transform(new TransformerSupplier() {
-     *     public Transformer get() {
-     *         return new MyTransformer();
-     *     }
-     * }, "myTransformState");
-     * }</pre>
-     * The second strategy is for the given {@link TransformerSupplier} to 
implement {@link ConnectedStoreProvider#stores()},
-     * which provides the {@link StoreBuilder}s to be automatically added to 
the topology and connected to the transformer.
-     * <pre>{@code
-     * class MyTransformerSupplier implements TransformerSupplier {
-     *     // supply transformer
-     *     Transformer get() {
-     *         return new MyTransformer();
-     *     }
-     *
-     *     // provide store(s) that will be added and connected to the 
associated transformer
-     *     // the store name from the builder ("myTransformState") is used to 
access the store later via the ProcessorContext
-     *     Set<StoreBuilder> stores() {
-     *         StoreBuilder<KeyValueStore<String, String>> 
keyValueStoreBuilder =
-     *                   
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
-     *                   Serdes.String(),
-     *                   Serdes.String());
-     *         return Collections.singleton(keyValueStoreBuilder);
-     *     }
-     * }
-     *
-     * ...
-     *
-     * KStream outputStream = inputStream.flatTransform(new 
MyTransformerSupplier());
-     * }</pre>
-     * <p>
-     * With either strategy, within the {@link Transformer}, the state is 
obtained via the {@link ProcessorContext}.
-     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
-     * a schedule must be registered.
-     * The {@link Transformer} must return an {@link java.lang.Iterable} type 
(e.g., any {@link java.util.Collection}
-     * type) in {@link Transformer#transform(Object, Object) transform()}.
-     * The return value of {@link Transformer#transform(Object, Object) 
Transformer#transform()} may be {@code null},
-     * which is equal to returning an empty {@link java.lang.Iterable 
Iterable}, i.e., no records are emitted.
-     * <pre>{@code
-     * class MyTransformer implements Transformer {
-     *     private ProcessorContext context;
-     *     private StateStore state;
-     *
-     *     void init(ProcessorContext context) {
-     *         this.context = context;
-     *         this.state = context.getStateStore("myTransformState");
-     *         // punctuate each second; can access this.state
-     *         context.schedule(Duration.ofSeconds(1), 
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
-     *     }
-     *
-     *     Iterable<KeyValue> transform(K key, V value) {
-     *         // can access this.state
-     *         List<KeyValue> result = new ArrayList<>();
-     *         for (int i = 0; i < 3; i++) {
-     *             result.add(new KeyValue(key, value));
-     *         }
-     *         return result; // emits a list of key-value pairs via return
-     *     }
-     *
-     *     void close() {
-     *         // can access this.state
-     *     }
-     * }
-     * }</pre>
-     * Even if any upstream operation was key-changing, no auto-repartition is 
triggered.
-     * If repartitioning is required, a call to {@link #repartition()} should 
be performed before
-     * {@code flatTransform()}.
-     * <p>
-     * Transforming records might result in an internal data redistribution if 
a key based operator (like an aggregation
-     * or join) is applied to the result {@code KStream}.
-     * (cf. {@link #transformValues(ValueTransformerSupplier, String...) 
transformValues()})
-     * <p>
-     * Note that it is possible to emit records by using
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
-     * context#forward()} in {@link Transformer#transform(Object, Object) 
Transformer#transform()} and
-     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
-     * Be aware that a mismatch between the types of the emitted records and 
the type of the stream would only be
-     * detected at runtime.
-     * To ensure type-safety at compile-time,
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
context#forward()} should
-     * not be used in {@link Transformer#transform(Object, Object) 
Transformer#transform()} and
-     * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) 
Punctuator#punctuate()}.
-     * The supplier should always generate a new instance each time {@link 
TransformerSupplier#get()} gets called. Creating
-     * a single {@link Transformer} object and returning the same object 
reference in {@link TransformerSupplier#get()} would be
-     * a violation of the supplier pattern and leads to runtime exceptions.
-     *
-     * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a newly constructed {@link Transformer}
-     * @param named               a {@link Named} config used to name the 
processor in the topology
-     * @param stateStoreNames     the names of the state stores used by the 
processor; not required if the supplier
-     *                            implements {@link 
ConnectedStoreProvider#stores()}
-     * @param <K1>                the key type of the new stream
-     * @param <V1>                the value type of the new stream
-     * @return a {@code KStream} that contains more or less records with new 
key and value (possibly of different type)
-     * @see #flatMap(KeyValueMapper)
-     * @see #transformValues(ValueTransformerSupplier, String...)
-     * @see #transformValues(ValueTransformerWithKeySupplier, String...)
-     * @see #process(ProcessorSupplier, String...)
-     * @deprecated Since 3.3. Use {@link KStream#process(ProcessorSupplier, 
Named, String...)} instead.
-     */
-    @Deprecated
-    <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super 
K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
-                                           final Named named,
-                                           final String... stateStoreNames);
-
     /**
      * Transform the value of each input record into a new value (with 
possibly a new type) of the output record.
      * A {@link ValueTransformer} (provided by the given {@link 
ValueTransformerSupplier}) is applied to each input
@@ -3645,8 +3379,7 @@ public interface KStream<K, V> {
      * a schedule must be registered.
      * The {@link ValueTransformerWithKey} must return the new value in
      * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
-     * In contrast to {@link #flatTransform(TransformerSupplier, String...) 
flatTransform()}, no additional {@link
-     * KeyValue} pairs can be emitted via
+     * No additional {@link KeyValue} pairs can be emitted via
      * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
      * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformerWithKey} tries
      * to emit a {@link KeyValue} pair.
@@ -3763,8 +3496,7 @@ public interface KStream<K, V> {
      * transform()}.
      * If the return value of {@link ValueTransformer#transform(Object) 
ValueTransformer#transform()} is an empty
      * {@link java.lang.Iterable Iterable} or {@code null}, no records are 
emitted.
-     * In contrast to {@link #flatTransform(TransformerSupplier, String...) 
flatTransform()}, no additional {@link
-     * KeyValue} pairs can be emitted via
+     * No additional {@link KeyValue} pairs can be emitted via
      * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
      * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformer} tries to
      * emit a {@link KeyValue} pair.
@@ -3798,8 +3530,7 @@ public interface KStream<K, V> {
      * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}. (cf. {@link 
#flatTransform(TransformerSupplier, String...)
-     * flatTransform()})
+     * is applied to the result {@code KStream}.
      *
      * @param valueTransformerSupplier an instance of {@link 
ValueTransformerSupplier} that generates a newly constructed {@link 
ValueTransformer}
      *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformer} object
@@ -3812,7 +3543,6 @@ public interface KStream<K, V> {
      * different type)
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
      */
     @Deprecated
@@ -3886,8 +3616,7 @@ public interface KStream<K, V> {
      * transform()}.
      * If the return value of {@link ValueTransformer#transform(Object) 
ValueTransformer#transform()} is an empty
      * {@link java.lang.Iterable Iterable} or {@code null}, no records are 
emitted.
-     * In contrast to {@link #flatTransform(TransformerSupplier, String...) 
flatTransform()}, no additional {@link
-     * KeyValue} pairs can be emitted via
+     * No additional {@link KeyValue} pairs can be emitted via
      * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
      * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformer} tries to
      * emit a {@link KeyValue} pair.
@@ -3921,8 +3650,7 @@ public interface KStream<K, V> {
      * <p>
      * Setting a new value preserves data co-location with respect to the key.
      * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}. (cf. {@link 
#flatTransform(TransformerSupplier, String...)
-     * flatTransform()})
+     * is applied to the result {@code KStream}.
      *
      * @param valueTransformerSupplier an instance of {@link 
ValueTransformerSupplier} that generates a newly constructed {@link 
ValueTransformer}
      *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformer} object
@@ -3936,7 +3664,6 @@ public interface KStream<K, V> {
      * different type)
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
      */
     @Deprecated
@@ -4011,8 +3738,7 @@ public interface KStream<K, V> {
      * transform()}.
      * If the return value of {@link ValueTransformerWithKey#transform(Object, 
Object) ValueTransformerWithKey#transform()}
      * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no 
records are emitted.
-     * In contrast to {@link #flatTransform(TransformerSupplier, String...) 
flatTransform()}, no additional {@link
-     * KeyValue} pairs can be emitted via
+     * No additional {@link KeyValue} pairs can be emitted via
      * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
      * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformerWithKey} tries
      * to emit a {@link KeyValue} pair.
@@ -4047,8 +3773,7 @@ public interface KStream<K, V> {
      * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
      * So, setting a new value preserves data co-location with respect to the 
key.
      * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}. (cf. {@link 
#flatTransform(TransformerSupplier, String...)
-     * flatTransform()})
+     * is applied to the result {@code KStream}.
      *
      * @param valueTransformerSupplier an instance of {@link 
ValueTransformerWithKeySupplier} that generates a newly constructed {@link 
ValueTransformerWithKey}
      *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformerWithKey} object
@@ -4061,7 +3786,6 @@ public interface KStream<K, V> {
      * different type)
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
      */
     @Deprecated
@@ -4135,8 +3859,7 @@ public interface KStream<K, V> {
      * transform()}.
      * If the return value of {@link ValueTransformerWithKey#transform(Object, 
Object) ValueTransformerWithKey#transform()}
      * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no 
records are emitted.
-     * In contrast to {@link #flatTransform(TransformerSupplier, String...) 
flatTransform()}, no additional {@link
-     * KeyValue} pairs can be emitted via
+     * No additional {@link KeyValue} pairs can be emitted via
      * {@link 
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) 
ProcessorContext.forward()}.
      * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if 
the {@link ValueTransformerWithKey} tries
      * to emit a {@link KeyValue} pair.
@@ -4171,8 +3894,7 @@ public interface KStream<K, V> {
      * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
      * So, setting a new value preserves data co-location with respect to the 
key.
      * Thus, <em>no</em> internal data redistribution is required if a key 
based operator (like an aggregation or join)
-     * is applied to the result {@code KStream}. (cf. {@link 
#flatTransform(TransformerSupplier, String...)
-     * flatTransform()})
+     * is applied to the result {@code KStream}.
      *
      * @param valueTransformerSupplier an instance of {@link 
ValueTransformerWithKeySupplier} that generates a newly constructed {@link 
ValueTransformerWithKey}
      *                                 The supplier should always generate a 
new instance. Creating a single {@link ValueTransformerWithKey} object
@@ -4186,7 +3908,6 @@ public interface KStream<K, V> {
      * different type)
      * @see #mapValues(ValueMapper)
      * @see #mapValues(ValueMapperWithKey)
-     * @see #flatTransform(TransformerSupplier, String...)
      * @deprecated Since 3.3. Use {@link 
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
      */
     @Deprecated
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 3dfd1096bc4..a23c5ad4b0b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1209,48 +1209,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, 
V> implements KStream<K
             builder);
     }
 
-    @Override
-    @Deprecated
-    public <K1, V1> KStream<K1, V1> flatTransform(final 
org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, 
Iterable<KeyValue<K1, V1>>> transformerSupplier,
-                                                  final String... 
stateStoreNames) {
-        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
-        final String name = builder.newProcessorName(TRANSFORM_NAME);
-        return flatTransform(transformerSupplier, Named.as(name), 
stateStoreNames);
-    }
-
-    @Override
-    @Deprecated
-    public <K1, V1> KStream<K1, V1> flatTransform(final 
org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V, 
Iterable<KeyValue<K1, V1>>> transformerSupplier,
-                                                  final Named named,
-                                                  final String... 
stateStoreNames) {
-        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't 
be null");
-        Objects.requireNonNull(named, "named can't be null");
-        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a 
null array");
-        ApiUtils.checkSupplier(transformerSupplier);
-        for (final String stateStoreName : stateStoreNames) {
-            Objects.requireNonNull(stateStoreName, "stateStoreNames can't 
contain `null` as store name");
-        }
-
-        final String name = new NamedInternal(named).name();
-        final StatefulProcessorNode<? super K, ? super V> transformNode = new 
StatefulProcessorNode<>(
-            name,
-            new ProcessorParameters<>(new 
KStreamFlatTransform<>(transformerSupplier), name),
-            stateStoreNames);
-        transformNode.keyChangingOperation(true);
-
-        builder.addGraphNode(graphNode, transformNode);
-
-        // cannot inherit key and value serde
-        return new KStreamImpl<>(
-            name,
-            null,
-            null,
-            subTopologySourceNodes,
-            true,
-            transformNode,
-            builder);
-    }
-
     @Override
     @Deprecated
     public <VR> KStream<K, VR> transformValues(final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends 
VR> valueTransformerSupplier,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
index f5bc9cf03c0..91824d5a5b8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
@@ -95,8 +95,6 @@ import java.util.Set;
  * @see 
KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
 Named, String...)
  * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
  * @see KStream#transformValues(ValueTransformerWithKeySupplier, Named, 
String...)
- * @see 
KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, 
String...)
- * @see 
KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier, 
Named, String...)
  * @see 
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
 String...)
  * @see 
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
 Named, String...)
  * @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3a65ba5fd83..226fc357a6a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -114,20 +114,6 @@ public class KStreamImplTest {
     private final MockApiProcessorSupplier<String, String, Void, Void> 
processorSupplier = new MockApiProcessorSupplier<>();
     private final MockApiFixedKeyProcessorSupplier<String, String, Void> 
fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
     @SuppressWarnings("deprecation")
-    private final org.apache.kafka.streams.kstream.TransformerSupplier<String, 
String, Iterable<KeyValue<String, String>>> flatTransformerSupplier =
-        () -> new org.apache.kafka.streams.kstream.Transformer<String, String, 
Iterable<KeyValue<String, String>>>() {
-            @Override
-            public void init(final ProcessorContext context) {}
-
-            @Override
-            public Iterable<KeyValue<String, String>> transform(final String 
key, final String value) {
-                return Collections.singleton(new KeyValue<>(key, value));
-            }
-
-            @Override
-            public void close() {}
-        };
-    @SuppressWarnings("deprecation")
     private final 
org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, String> 
valueTransformerSupplier =
         () -> new org.apache.kafka.streams.kstream.ValueTransformer<String, 
String>() {
             @Override
@@ -1576,139 +1562,49 @@ public class KStreamImplTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowBadTransformerSupplierOnFlatTransform() {
-        final org.apache.kafka.streams.kstream.Transformer<String, String, 
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+    public void shouldNotAllowBadProcessSupplierOnProcess() {
+        final org.apache.kafka.streams.processor.api.Processor<String, String, 
Void, Void> processor =
+            processorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
             IllegalArgumentException.class,
-            () -> testStream.flatTransform(() -> transformer)
+            () -> testStream.process(() -> processor)
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowBadTransformerSupplierOnFlatTransformWithStores() {
-        final org.apache.kafka.streams.kstream.Transformer<String, String, 
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+    public void shouldNotAllowBadProcessSupplierOnProcessWithStores() {
+        final org.apache.kafka.streams.processor.api.Processor<String, String, 
Void, Void> processor =
+            processorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
                 IllegalArgumentException.class,
-            () -> testStream.flatTransform(() -> transformer, "storeName")
+            () -> testStream.process(() -> processor, "storeName")
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamed() 
{
-        final org.apache.kafka.streams.kstream.Transformer<String, String, 
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+    public void shouldNotAllowBadProcessSupplierOnProcessWithNamed() {
+        final org.apache.kafka.streams.processor.api.Processor<String, String, 
Void, Void> processor =
+            processorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
                 IllegalArgumentException.class,
-            () -> testStream.flatTransform(() -> transformer, 
Named.as("flatTransformer"))
+            () -> testStream.process(() -> processor, 
Named.as("flatTransformer"))
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamedAndStores() {
-        final org.apache.kafka.streams.kstream.Transformer<String, String, 
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+    public void shouldNotAllowBadProcessSupplierOnProcessWithNamedAndStores() {
+        final org.apache.kafka.streams.processor.api.Processor<String, String, 
Void, Void> processor =
+            processorSupplier.get();
         final IllegalArgumentException exception = assertThrows(
                 IllegalArgumentException.class,
-            () -> testStream.flatTransform(() -> transformer, 
Named.as("flatTransformer"), "storeName")
+            () -> testStream.process(() -> processor, Named.as("processor"), 
"storeName")
         );
         assertThat(exception.getMessage(), containsString("#get() must return 
a new object each time it is called."));
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullTransformerSupplierOnFlatTransform() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(null));
-        assertThat(exception.getMessage(), equalTo("transformerSupplier can't 
be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullTransformerSupplierOnFlatTransformWithStores() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(null, "storeName"));
-        assertThat(exception.getMessage(), equalTo("transformerSupplier can't 
be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamed() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(null, Named.as("flatTransformer")));
-        assertThat(exception.getMessage(), equalTo("transformerSupplier can't 
be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void 
shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamedAndStores() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(null, Named.as("flatTransformer"), 
"storeName"));
-        assertThat(exception.getMessage(), equalTo("transformerSupplier can't 
be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullStoreNamesOnFlatTransform() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(flatTransformerSupplier, (String[]) 
null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a 
null array"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullStoreNameOnFlatTransform() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(flatTransformerSupplier, (String) 
null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't 
contain `null` as store name"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullStoreNamesOnFlatTransformWithNamed() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(flatTransformerSupplier, 
Named.as("flatTransform"), (String[]) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a 
null array"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullStoreNameOnFlatTransformWithNamed() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(flatTransformerSupplier, 
Named.as("flatTransform"), (String) null));
-        assertThat(exception.getMessage(), equalTo("stateStoreNames can't 
contain `null` as store name"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullNamedOnFlatTransform() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(flatTransformerSupplier, (Named) 
null));
-        assertThat(exception.getMessage(), equalTo("named can't be null"));
-    }
-
-    @Test
-    @SuppressWarnings("deprecation")
-    public void shouldNotAllowNullNamedOnFlatTransformWithStoreName() {
-        final NullPointerException exception = assertThrows(
-            NullPointerException.class,
-            () -> testStream.flatTransform(flatTransformerSupplier, (Named) 
null, "storeName"));
-        assertThat(exception.getMessage(), equalTo("named can't be null"));
-    }
-
     @Test
     @SuppressWarnings("deprecation")
     public void shouldNotAllowBadTransformerSupplierOnTransformValues() {
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 0d9321b0ebd..5e6cc4f3f22 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -17,13 +17,11 @@
 package org.apache.kafka.streams.scala
 package kstream
 
-import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{
   GlobalKTable,
   JoinWindows,
   KStream => KStreamJ,
   Printed,
-  TransformerSupplier,
   ValueTransformerSupplier,
   ValueTransformerWithKeySupplier
 }
@@ -36,7 +34,6 @@ import 
org.apache.kafka.streams.scala.FunctionsCompatConversions.{
   KeyValueMapperFromFunction,
   MapperFromFunction,
   PredicateFromFunction,
-  TransformerSupplierAsJava,
   ValueMapperFromFunction,
   ValueMapperWithKeyFromFunction,
   ValueTransformerSupplierAsJava,
@@ -495,52 +492,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   def toTable(named: Named, materialized: Materialized[K, V, 
ByteArrayKeyValueStore]): KTable[K, V] =
     new KTable(inner.toTable(named, materialized))
 
-  /**
-   * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
-   * can be altered arbitrarily).
-   * A `Transformer` (provided by the given `TransformerSupplier`) is applied 
to each input record
-   * and computes zero or more output records.
-   * In order to assign a state, the state must be created and added via 
`addStateStore` before they can be connected
-   * to the `Transformer`.
-   * It's not required to connect global state stores that are added via 
`addGlobalStore`;
-   * read-only access to global state stores is available by default.
-   *
-   * @param transformerSupplier the `TransformerSuplier` that generates 
`Transformer`
-   * @param stateStoreNames     the names of the state stores used by the 
processor
-   * @return a [[KStream]] that contains more or less records with new key and 
value (possibly of different type)
-   * @see `org.apache.kafka.streams.kstream.KStream#transform`
-   */
-  @deprecated(since = "3.3", message = "Use process(ProcessorSupplier, 
String*) instead.")
-  def flatTransform[K1, V1](
-    transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
-    stateStoreNames: String*
-  ): KStream[K1, V1] =
-    new KStream(inner.flatTransform(transformerSupplier.asJava, 
stateStoreNames: _*))
-
-  /**
-   * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
-   * can be altered arbitrarily).
-   * A `Transformer` (provided by the given `TransformerSupplier`) is applied 
to each input record
-   * and computes zero or more output records.
-   * In order to assign a state, the state must be created and added via 
`addStateStore` before they can be connected
-   * to the `Transformer`.
-   * It's not required to connect global state stores that are added via 
`addGlobalStore`;
-   * read-only access to global state stores is available by default.
-   *
-   * @param transformerSupplier the `TransformerSuplier` that generates 
`Transformer`
-   * @param named               a [[Named]] config used to name the processor 
in the topology
-   * @param stateStoreNames     the names of the state stores used by the 
processor
-   * @return a [[KStream]] that contains more or less records with new key and 
value (possibly of different type)
-   * @see `org.apache.kafka.streams.kstream.KStream#transform`
-   */
-  @deprecated(since = "3.3", message = "Use process(ProcessorSupplier, Named, 
String*) instead.")
-  def flatTransform[K1, V1](
-    transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
-    named: Named,
-    stateStoreNames: String*
-  ): KStream[K1, V1] =
-    new KStream(inner.flatTransform(transformerSupplier.asJava, named, 
stateStoreNames: _*))
-
   /**
    * Transform the value of each input record into zero or more records (with 
possible new type) in the
    * output stream.
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 193cb82b53f..88fe8e8980d 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -18,11 +18,9 @@ package org.apache.kafka.streams.scala.kstream
 
 import java.time.Duration.ofSeconds
 import java.time.{Duration, Instant}
-import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{
   JoinWindows,
   Named,
-  Transformer,
   ValueTransformer,
   ValueTransformerSupplier,
   ValueTransformerWithKey,
@@ -289,42 +287,6 @@ class KStreamTest extends TestDriver {
     testDriver.close()
   }
 
-  @nowarn
-  @Test
-  def testFlatTransformCorrectlyRecords(): Unit = {
-    class TestTransformer extends Transformer[String, String, 
Iterable[KeyValue[String, String]]] {
-      override def init(context: ProcessorContext): Unit = {}
-
-      override def transform(key: String, value: String): 
Iterable[KeyValue[String, String]] =
-        Array(new KeyValue(s"$key-transformed", s"$value-transformed"))
-
-      override def close(): Unit = {}
-    }
-    val builder = new StreamsBuilder()
-    val sourceTopic = "source"
-    val sinkTopic = "sink"
-
-    val stream = builder.stream[String, String](sourceTopic)
-    stream
-      .flatTransform(() => new TestTransformer)
-      .to(sinkTopic)
-
-    val now = Instant.now()
-    val testDriver = createTestDriver(builder, now)
-    val testInput = testDriver.createInput[String, String](sourceTopic)
-    val testOutput = testDriver.createOutput[String, String](sinkTopic)
-
-    testInput.pipeInput("1", "value", now)
-
-    val result = testOutput.readKeyValue()
-    assertEquals("value-transformed", result.value)
-    assertEquals("1-transformed", result.key)
-
-    assertTrue(testOutput.isEmpty)
-
-    testDriver.close()
-  }
-
   @nowarn
   @Test
   def testCorrectlyFlatTransformValuesInRecords(): Unit = {

Reply via email to