This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95650431dfd KAFKA-16339: [2/4 KStream#flatTransform] Remove Deprecated
"transformer" methods and classes (#17245)
95650431dfd is described below
commit 95650431dfd2072cfd79efd5536d9eff7b50f3d9
Author: Joao Pedro Fonseca Dantas <[email protected]>
AuthorDate: Thu Nov 7 22:35:17 2024 -0300
KAFKA-16339: [2/4 KStream#flatTransform] Remove Deprecated "transformer"
methods and classes (#17245)
Reviewers: Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/kstream/KStream.java | 297 +--------------------
.../streams/kstream/internals/KStreamImpl.java | 42 ---
.../streams/processor/ConnectedStoreProvider.java | 2 -
.../streams/kstream/internals/KStreamImplTest.java | 136 ++--------
.../kafka/streams/scala/kstream/KStream.scala | 49 ----
.../kafka/streams/scala/kstream/KStreamTest.scala | 38 ---
6 files changed, 25 insertions(+), 539 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index a5679becc51..516ffb80227 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -436,7 +436,6 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #flatTransform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -488,7 +487,6 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #flatTransform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -532,7 +530,6 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #flatTransform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -576,7 +573,6 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #flatTransform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -625,7 +621,6 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #flatTransform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -675,7 +670,6 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #flatTransform(TransformerSupplier, String...)
* @see #transformValues(ValueTransformerSupplier, String...)
* @see #transformValues(ValueTransformerWithKeySupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
@@ -2989,266 +2983,6 @@ public interface KStream<K, V> {
final ValueJoinerWithKey<? super K, ?
super V, ? super GV, ? extends RV> valueJoiner,
final Named named);
- /**
- * Transform each record of the input stream into zero or more records in
the output stream (both key and value type
- * can be altered arbitrarily).
- * A {@link Transformer} (provided by the given {@link
TransformerSupplier}) is applied to each input record and
- * returns zero or more output records.
- * Thus, an input record {@code <K,V>} can be transformed into output
records {@code <K':V'>, <K'':V''>, ...}.
- * Attaching a state store makes this a stateful record-by-record
operation (cf. {@link #flatMap(KeyValueMapper) flatMap()}).
- * If you choose not to attach one, this operation is similar to the
stateless {@link #flatMap(KeyValueMapper) flatMap()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * Furthermore, via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long)
Punctuator#punctuate()}
- * the processing progress can be observed and additional periodic actions
can be performed.
- * <p>
- * In order for the transformer to use state stores, the stores must be
added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not
required to connect global state stores; read-only
- * access to global state stores is available by default).
- * <p>
- * The first strategy is to manually add the {@link StoreBuilder}s via
{@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be
connected to the transformer.
- * <pre>{@code
- * // create store
- * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.transform(new TransformerSupplier() {
- * public Transformer get() {
- * return new MyTransformer();
- * }
- * }, "myTransformState");
- * }</pre>
- * The second strategy is for the given {@link TransformerSupplier} to
implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to
the topology and connected to the transformer.
- * <pre>{@code
- * class MyTransformerSupplier implements TransformerSupplier {
- * // supply transformer
- * Transformer get() {
- * return new MyTransformer();
- * }
- *
- * // provide store(s) that will be added and connected to the
associated transformer
- * // the store name from the builder ("myTransformState") is used to
access the store later via the ProcessorContext
- * Set<StoreBuilder> stores() {
- * StoreBuilder<KeyValueStore<String, String>>
keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.flatTransform(new
MyTransformerSupplier());
- * }</pre>
- * <p>
- * With either strategy, within the {@link Transformer}, the state is
obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link Transformer} must return an {@link java.lang.Iterable} type
(e.g., any {@link java.util.Collection}
- * type) in {@link Transformer#transform(Object, Object) transform()}.
- * The return value of {@link Transformer#transform(Object, Object)
Transformer#transform()} may be {@code null},
- * which is equal to returning an empty {@link java.lang.Iterable
Iterable}, i.e., no records are emitted.
- * <pre>{@code
- * class MyTransformer implements Transformer {
- * private ProcessorContext context;
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.context = context;
- * this.state = context.getStateStore("myTransformState");
- * // punctuate each second; can access this.state
- * context.schedule(Duration.ofSeconds(1),
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * Iterable<KeyValue> transform(K key, V value) {
- * // can access this.state
- * List<KeyValue> result = new ArrayList<>();
- * for (int i = 0; i < 3; i++) {
- * result.add(new KeyValue(key, value));
- * }
- * return result; // emits a list of key-value pairs via return
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }</pre>
- * Even if any upstream operation was key-changing, no auto-repartition is
triggered.
- * If repartitioning is required, a call to {@link #repartition()} should
be performed before
- * {@code flatTransform()}.
- * <p>
- * Transforming records might result in an internal data redistribution if
a key based operator (like an aggregation
- * or join) is applied to the result {@code KStream}.
- * (cf. {@link #transformValues(ValueTransformerSupplier, String...)
transformValues()})
- * <p>
- * Note that it is possible to emit records by using
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
- * context#forward()} in {@link Transformer#transform(Object, Object)
Transformer#transform()} and
- * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
Punctuator#punctuate()}.
- * Be aware that a mismatch between the types of the emitted records and
the type of the stream would only be
- * detected at runtime.
- * To ensure type-safety at compile-time,
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
context#forward()} should
- * not be used in {@link Transformer#transform(Object, Object)
Transformer#transform()} and
- * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
Punctuator#punctuate()}.
- * The supplier should always generate a new instance each time {@link
TransformerSupplier#get()} gets called. Creating
- * a single {@link Transformer} object and returning the same object
reference in {@link TransformerSupplier#get()} would be
- * a violation of the supplier pattern and leads to runtime exceptions.
- *
- * @param transformerSupplier an instance of {@link TransformerSupplier}
that generates a newly constructed {@link Transformer}
- * @param stateStoreNames the names of the state stores used by the
processor; not required if the supplier
- * implements {@link
ConnectedStoreProvider#stores()}
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- * @return a {@code KStream} that contains more or less records with new
key and value (possibly of different type)
- * @see #flatMap(KeyValueMapper)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
- * @see #process(ProcessorSupplier, String...)
- * @deprecated Since 3.3. Use {@link KStream#process(ProcessorSupplier,
String...)} instead.
- */
- @Deprecated
- <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super
K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
- final String... stateStoreNames);
-
- /**
- * Transform each record of the input stream into zero or more records in
the output stream (both key and value type
- * can be altered arbitrarily).
- * A {@link Transformer} (provided by the given {@link
TransformerSupplier}) is applied to each input record and
- * returns zero or more output records.
- * Thus, an input record {@code <K,V>} can be transformed into output
records {@code <K':V'>, <K'':V''>, ...}.
- * Attaching a state store makes this a stateful record-by-record
operation (cf. {@link #flatMap(KeyValueMapper) flatMap()}).
- * If you choose not to attach one, this operation is similar to the
stateless {@link #flatMap(KeyValueMapper) flatMap()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * Furthermore, via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long)
Punctuator#punctuate()}
- * the processing progress can be observed and additional periodic actions
can be performed.
- * <p>
- * In order for the transformer to use state stores, the stores must be
added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not
required to connect global state stores; read-only
- * access to global state stores is available by default).
- * <p>
- * The first strategy is to manually add the {@link StoreBuilder}s via
{@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be
connected to the transformer.
- * <pre>{@code
- * // create store
- * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.transform(new TransformerSupplier() {
- * public Transformer get() {
- * return new MyTransformer();
- * }
- * }, "myTransformState");
- * }</pre>
- * The second strategy is for the given {@link TransformerSupplier} to
implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to
the topology and connected to the transformer.
- * <pre>{@code
- * class MyTransformerSupplier implements TransformerSupplier {
- * // supply transformer
- * Transformer get() {
- * return new MyTransformer();
- * }
- *
- * // provide store(s) that will be added and connected to the
associated transformer
- * // the store name from the builder ("myTransformState") is used to
access the store later via the ProcessorContext
- * Set<StoreBuilder> stores() {
- * StoreBuilder<KeyValueStore<String, String>>
keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.flatTransform(new
MyTransformerSupplier());
- * }</pre>
- * <p>
- * With either strategy, within the {@link Transformer}, the state is
obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link Transformer} must return an {@link java.lang.Iterable} type
(e.g., any {@link java.util.Collection}
- * type) in {@link Transformer#transform(Object, Object) transform()}.
- * The return value of {@link Transformer#transform(Object, Object)
Transformer#transform()} may be {@code null},
- * which is equal to returning an empty {@link java.lang.Iterable
Iterable}, i.e., no records are emitted.
- * <pre>{@code
- * class MyTransformer implements Transformer {
- * private ProcessorContext context;
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.context = context;
- * this.state = context.getStateStore("myTransformState");
- * // punctuate each second; can access this.state
- * context.schedule(Duration.ofSeconds(1),
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * Iterable<KeyValue> transform(K key, V value) {
- * // can access this.state
- * List<KeyValue> result = new ArrayList<>();
- * for (int i = 0; i < 3; i++) {
- * result.add(new KeyValue(key, value));
- * }
- * return result; // emits a list of key-value pairs via return
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }</pre>
- * Even if any upstream operation was key-changing, no auto-repartition is
triggered.
- * If repartitioning is required, a call to {@link #repartition()} should
be performed before
- * {@code flatTransform()}.
- * <p>
- * Transforming records might result in an internal data redistribution if
a key based operator (like an aggregation
- * or join) is applied to the result {@code KStream}.
- * (cf. {@link #transformValues(ValueTransformerSupplier, String...)
transformValues()})
- * <p>
- * Note that it is possible to emit records by using
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
- * context#forward()} in {@link Transformer#transform(Object, Object)
Transformer#transform()} and
- * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
Punctuator#punctuate()}.
- * Be aware that a mismatch between the types of the emitted records and
the type of the stream would only be
- * detected at runtime.
- * To ensure type-safety at compile-time,
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
context#forward()} should
- * not be used in {@link Transformer#transform(Object, Object)
Transformer#transform()} and
- * {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)
Punctuator#punctuate()}.
- * The supplier should always generate a new instance each time {@link
TransformerSupplier#get()} gets called. Creating
- * a single {@link Transformer} object and returning the same object
reference in {@link TransformerSupplier#get()} would be
- * a violation of the supplier pattern and leads to runtime exceptions.
- *
- * @param transformerSupplier an instance of {@link TransformerSupplier}
that generates a newly constructed {@link Transformer}
- * @param named a {@link Named} config used to name the
processor in the topology
- * @param stateStoreNames the names of the state stores used by the
processor; not required if the supplier
- * implements {@link
ConnectedStoreProvider#stores()}
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- * @return a {@code KStream} that contains more or less records with new
key and value (possibly of different type)
- * @see #flatMap(KeyValueMapper)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
- * @see #process(ProcessorSupplier, String...)
- * @deprecated Since 3.3. Use {@link KStream#process(ProcessorSupplier,
Named, String...)} instead.
- */
- @Deprecated
- <K1, V1> KStream<K1, V1> flatTransform(final TransformerSupplier<? super
K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier,
- final Named named,
- final String... stateStoreNames);
-
/**
* Transform the value of each input record into a new value (with
possibly a new type) of the output record.
* A {@link ValueTransformer} (provided by the given {@link
ValueTransformerSupplier}) is applied to each input
@@ -3645,8 +3379,7 @@ public interface KStream<K, V> {
* a schedule must be registered.
* The {@link ValueTransformerWithKey} must return the new value in
* {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
- * In contrast to {@link #flatTransform(TransformerSupplier, String...)
flatTransform()}, no additional {@link
- * KeyValue} pairs can be emitted via
+ * No additional {@link KeyValue} pairs can be emitted via
* {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
@@ -3763,8 +3496,7 @@ public interface KStream<K, V> {
* transform()}.
* If the return value of {@link ValueTransformer#transform(Object)
ValueTransformer#transform()} is an empty
* {@link java.lang.Iterable Iterable} or {@code null}, no records are
emitted.
- * In contrast to {@link #flatTransform(TransformerSupplier, String...)
flatTransform()}, no additional {@link
- * KeyValue} pairs can be emitted via
+ * No additional {@link KeyValue} pairs can be emitted via
* {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
@@ -3798,8 +3530,7 @@ public interface KStream<K, V> {
* <p>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link
#flatTransform(TransformerSupplier, String...)
- * flatTransform()})
+ * is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link
ValueTransformerSupplier} that generates a newly constructed {@link
ValueTransformer}
* The supplier should always generate a
new instance. Creating a single {@link ValueTransformer} object
@@ -3812,7 +3543,6 @@ public interface KStream<K, V> {
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
- * @see #flatTransform(TransformerSupplier, String...)
* @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
*/
@Deprecated
@@ -3886,8 +3616,7 @@ public interface KStream<K, V> {
* transform()}.
* If the return value of {@link ValueTransformer#transform(Object)
ValueTransformer#transform()} is an empty
* {@link java.lang.Iterable Iterable} or {@code null}, no records are
emitted.
- * In contrast to {@link #flatTransform(TransformerSupplier, String...)
flatTransform()}, no additional {@link
- * KeyValue} pairs can be emitted via
+ * No additional {@link KeyValue} pairs can be emitted via
* {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformer} tries to
* emit a {@link KeyValue} pair.
@@ -3921,8 +3650,7 @@ public interface KStream<K, V> {
* <p>
* Setting a new value preserves data co-location with respect to the key.
* Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link
#flatTransform(TransformerSupplier, String...)
- * flatTransform()})
+ * is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link
ValueTransformerSupplier} that generates a newly constructed {@link
ValueTransformer}
* The supplier should always generate a
new instance. Creating a single {@link ValueTransformer} object
@@ -3936,7 +3664,6 @@ public interface KStream<K, V> {
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
- * @see #flatTransform(TransformerSupplier, String...)
* @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
*/
@Deprecated
@@ -4011,8 +3738,7 @@ public interface KStream<K, V> {
* transform()}.
* If the return value of {@link ValueTransformerWithKey#transform(Object,
Object) ValueTransformerWithKey#transform()}
* is an empty {@link java.lang.Iterable Iterable} or {@code null}, no
records are emitted.
- * In contrast to {@link #flatTransform(TransformerSupplier, String...)
flatTransform()}, no additional {@link
- * KeyValue} pairs can be emitted via
+ * No additional {@link KeyValue} pairs can be emitted via
* {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
@@ -4047,8 +3773,7 @@ public interface KStream<K, V> {
* Note that the key is read-only and should not be modified, as this can
lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the
key.
* Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link
#flatTransform(TransformerSupplier, String...)
- * flatTransform()})
+ * is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link
ValueTransformerWithKeySupplier} that generates a newly constructed {@link
ValueTransformerWithKey}
* The supplier should always generate a
new instance. Creating a single {@link ValueTransformerWithKey} object
@@ -4061,7 +3786,6 @@ public interface KStream<K, V> {
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
- * @see #flatTransform(TransformerSupplier, String...)
* @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
*/
@Deprecated
@@ -4135,8 +3859,7 @@ public interface KStream<K, V> {
* transform()}.
* If the return value of {@link ValueTransformerWithKey#transform(Object,
Object) ValueTransformerWithKey#transform()}
* is an empty {@link java.lang.Iterable Iterable} or {@code null}, no
records are emitted.
- * In contrast to {@link #flatTransform(TransformerSupplier, String...)
flatTransform()}, no additional {@link
- * KeyValue} pairs can be emitted via
+ * No additional {@link KeyValue} pairs can be emitted via
* {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
* A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformerWithKey} tries
* to emit a {@link KeyValue} pair.
@@ -4171,8 +3894,7 @@ public interface KStream<K, V> {
* Note that the key is read-only and should not be modified, as this can
lead to corrupt partitioning.
* So, setting a new value preserves data co-location with respect to the
key.
* Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}. (cf. {@link
#flatTransform(TransformerSupplier, String...)
- * flatTransform()})
+ * is applied to the result {@code KStream}.
*
* @param valueTransformerSupplier an instance of {@link
ValueTransformerWithKeySupplier} that generates a newly constructed {@link
ValueTransformerWithKey}
* The supplier should always generate a
new instance. Creating a single {@link ValueTransformerWithKey} object
@@ -4186,7 +3908,6 @@ public interface KStream<K, V> {
* different type)
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
- * @see #flatTransform(TransformerSupplier, String...)
* @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
*/
@Deprecated
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 3dfd1096bc4..a23c5ad4b0b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1209,48 +1209,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
builder);
}
- @Override
- @Deprecated
- public <K1, V1> KStream<K1, V1> flatTransform(final
org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V,
Iterable<KeyValue<K1, V1>>> transformerSupplier,
- final String...
stateStoreNames) {
- Objects.requireNonNull(transformerSupplier, "transformerSupplier can't
be null");
- final String name = builder.newProcessorName(TRANSFORM_NAME);
- return flatTransform(transformerSupplier, Named.as(name),
stateStoreNames);
- }
-
- @Override
- @Deprecated
- public <K1, V1> KStream<K1, V1> flatTransform(final
org.apache.kafka.streams.kstream.TransformerSupplier<? super K, ? super V,
Iterable<KeyValue<K1, V1>>> transformerSupplier,
- final Named named,
- final String...
stateStoreNames) {
- Objects.requireNonNull(transformerSupplier, "transformerSupplier can't
be null");
- Objects.requireNonNull(named, "named can't be null");
- Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a
null array");
- ApiUtils.checkSupplier(transformerSupplier);
- for (final String stateStoreName : stateStoreNames) {
- Objects.requireNonNull(stateStoreName, "stateStoreNames can't
contain `null` as store name");
- }
-
- final String name = new NamedInternal(named).name();
- final StatefulProcessorNode<? super K, ? super V> transformNode = new
StatefulProcessorNode<>(
- name,
- new ProcessorParameters<>(new
KStreamFlatTransform<>(transformerSupplier), name),
- stateStoreNames);
- transformNode.keyChangingOperation(true);
-
- builder.addGraphNode(graphNode, transformNode);
-
- // cannot inherit key and value serde
- return new KStreamImpl<>(
- name,
- null,
- null,
- subTopologySourceNodes,
- true,
- transformNode,
- builder);
- }
-
@Override
@Deprecated
public <VR> KStream<K, VR> transformValues(final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends
VR> valueTransformerSupplier,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
index f5bc9cf03c0..91824d5a5b8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
@@ -95,8 +95,6 @@ import java.util.Set;
* @see
KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
Named, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see KStream#transformValues(ValueTransformerWithKeySupplier, Named,
String...)
- * @see
KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier,
String...)
- * @see
KStream#flatTransform(org.apache.kafka.streams.kstream.TransformerSupplier,
Named, String...)
* @see
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
String...)
* @see
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3a65ba5fd83..226fc357a6a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -114,20 +114,6 @@ public class KStreamImplTest {
private final MockApiProcessorSupplier<String, String, Void, Void>
processorSupplier = new MockApiProcessorSupplier<>();
private final MockApiFixedKeyProcessorSupplier<String, String, Void>
fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
@SuppressWarnings("deprecation")
- private final org.apache.kafka.streams.kstream.TransformerSupplier<String,
String, Iterable<KeyValue<String, String>>> flatTransformerSupplier =
- () -> new org.apache.kafka.streams.kstream.Transformer<String, String,
Iterable<KeyValue<String, String>>>() {
- @Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public Iterable<KeyValue<String, String>> transform(final String
key, final String value) {
- return Collections.singleton(new KeyValue<>(key, value));
- }
-
- @Override
- public void close() {}
- };
- @SuppressWarnings("deprecation")
private final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, String>
valueTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.ValueTransformer<String,
String>() {
@Override
@@ -1576,139 +1562,49 @@ public class KStreamImplTest {
}
@Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowBadTransformerSupplierOnFlatTransform() {
- final org.apache.kafka.streams.kstream.Transformer<String, String,
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+ public void shouldNotAllowBadProcessSupplierOnProcess() {
+ final org.apache.kafka.streams.processor.api.Processor<String, String,
Void, Void> processor =
+ processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
- () -> testStream.flatTransform(() -> transformer)
+ () -> testStream.process(() -> processor)
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
@Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowBadTransformerSupplierOnFlatTransformWithStores() {
- final org.apache.kafka.streams.kstream.Transformer<String, String,
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+ public void shouldNotAllowBadProcessSupplierOnProcessWithStores() {
+ final org.apache.kafka.streams.processor.api.Processor<String, String,
Void, Void> processor =
+ processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
- () -> testStream.flatTransform(() -> transformer, "storeName")
+ () -> testStream.process(() -> processor, "storeName")
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
@Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamed()
{
- final org.apache.kafka.streams.kstream.Transformer<String, String,
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+ public void shouldNotAllowBadProcessSupplierOnProcessWithNamed() {
+ final org.apache.kafka.streams.processor.api.Processor<String, String,
Void, Void> processor =
+ processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
- () -> testStream.flatTransform(() -> transformer,
Named.as("flatTransformer"))
+ () -> testStream.process(() -> processor,
Named.as("flatTransformer"))
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
@Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowBadTransformerSupplierOnFlatTransformWithNamedAndStores() {
- final org.apache.kafka.streams.kstream.Transformer<String, String,
Iterable<KeyValue<String, String>>> transformer = flatTransformerSupplier.get();
+ public void shouldNotAllowBadProcessSupplierOnProcessWithNamedAndStores() {
+ final org.apache.kafka.streams.processor.api.Processor<String, String,
Void, Void> processor =
+ processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
- () -> testStream.flatTransform(() -> transformer,
Named.as("flatTransformer"), "storeName")
+ () -> testStream.process(() -> processor, Named.as("processor"),
"storeName")
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullTransformerSupplierOnFlatTransform() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(null));
- assertThat(exception.getMessage(), equalTo("transformerSupplier can't
be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullTransformerSupplierOnFlatTransformWithStores() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(null, "storeName"));
- assertThat(exception.getMessage(), equalTo("transformerSupplier can't
be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamed() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(null, Named.as("flatTransformer")));
- assertThat(exception.getMessage(), equalTo("transformerSupplier can't
be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullTransformerSupplierOnFlatTransformWithNamedAndStores() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(null, Named.as("flatTransformer"),
"storeName"));
- assertThat(exception.getMessage(), equalTo("transformerSupplier can't
be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullStoreNamesOnFlatTransform() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(flatTransformerSupplier, (String[])
null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a
null array"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullStoreNameOnFlatTransform() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(flatTransformerSupplier, (String)
null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't
contain `null` as store name"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullStoreNamesOnFlatTransformWithNamed() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(flatTransformerSupplier,
Named.as("flatTransform"), (String[]) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a
null array"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullStoreNameOnFlatTransformWithNamed() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(flatTransformerSupplier,
Named.as("flatTransform"), (String) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't
contain `null` as store name"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullNamedOnFlatTransform() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(flatTransformerSupplier, (Named)
null));
- assertThat(exception.getMessage(), equalTo("named can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullNamedOnFlatTransformWithStoreName() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransform(flatTransformerSupplier, (Named)
null, "storeName"));
- assertThat(exception.getMessage(), equalTo("named can't be null"));
- }
-
@Test
@SuppressWarnings("deprecation")
public void shouldNotAllowBadTransformerSupplierOnTransformValues() {
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 0d9321b0ebd..5e6cc4f3f22 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -17,13 +17,11 @@
package org.apache.kafka.streams.scala
package kstream
-import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
GlobalKTable,
JoinWindows,
KStream => KStreamJ,
Printed,
- TransformerSupplier,
ValueTransformerSupplier,
ValueTransformerWithKeySupplier
}
@@ -36,7 +34,6 @@ import
org.apache.kafka.streams.scala.FunctionsCompatConversions.{
KeyValueMapperFromFunction,
MapperFromFunction,
PredicateFromFunction,
- TransformerSupplierAsJava,
ValueMapperFromFunction,
ValueMapperWithKeyFromFunction,
ValueTransformerSupplierAsJava,
@@ -495,52 +492,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
def toTable(named: Named, materialized: Materialized[K, V,
ByteArrayKeyValueStore]): KTable[K, V] =
new KTable(inner.toTable(named, materialized))
- /**
- * Transform each record of the input stream into zero or more records in
the output stream (both key and value type
- * can be altered arbitrarily).
- * A `Transformer` (provided by the given `TransformerSupplier`) is applied
to each input record
- * and computes zero or more output records.
- * In order to assign a state, the state must be created and added via
`addStateStore` before they can be connected
- * to the `Transformer`.
- * It's not required to connect global state stores that are added via
`addGlobalStore`;
- * read-only access to global state stores is available by default.
- *
- * @param transformerSupplier the `TransformerSuplier` that generates
`Transformer`
- * @param stateStoreNames the names of the state stores used by the
processor
- * @return a [[KStream]] that contains more or less records with new key and
value (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transform`
- */
- @deprecated(since = "3.3", message = "Use process(ProcessorSupplier,
String*) instead.")
- def flatTransform[K1, V1](
- transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
- stateStoreNames: String*
- ): KStream[K1, V1] =
- new KStream(inner.flatTransform(transformerSupplier.asJava,
stateStoreNames: _*))
-
- /**
- * Transform each record of the input stream into zero or more records in
the output stream (both key and value type
- * can be altered arbitrarily).
- * A `Transformer` (provided by the given `TransformerSupplier`) is applied
to each input record
- * and computes zero or more output records.
- * In order to assign a state, the state must be created and added via
`addStateStore` before they can be connected
- * to the `Transformer`.
- * It's not required to connect global state stores that are added via
`addGlobalStore`;
- * read-only access to global state stores is available by default.
- *
- * @param transformerSupplier the `TransformerSuplier` that generates
`Transformer`
- * @param named a [[Named]] config used to name the processor
in the topology
- * @param stateStoreNames the names of the state stores used by the
processor
- * @return a [[KStream]] that contains more or less records with new key and
value (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transform`
- */
- @deprecated(since = "3.3", message = "Use process(ProcessorSupplier, Named,
String*) instead.")
- def flatTransform[K1, V1](
- transformerSupplier: TransformerSupplier[K, V, Iterable[KeyValue[K1, V1]]],
- named: Named,
- stateStoreNames: String*
- ): KStream[K1, V1] =
- new KStream(inner.flatTransform(transformerSupplier.asJava, named,
stateStoreNames: _*))
-
/**
* Transform the value of each input record into zero or more records (with
possible new type) in the
* output stream.
diff --git
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 193cb82b53f..88fe8e8980d 100644
---
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -18,11 +18,9 @@ package org.apache.kafka.streams.scala.kstream
import java.time.Duration.ofSeconds
import java.time.{Duration, Instant}
-import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{
JoinWindows,
Named,
- Transformer,
ValueTransformer,
ValueTransformerSupplier,
ValueTransformerWithKey,
@@ -289,42 +287,6 @@ class KStreamTest extends TestDriver {
testDriver.close()
}
- @nowarn
- @Test
- def testFlatTransformCorrectlyRecords(): Unit = {
- class TestTransformer extends Transformer[String, String,
Iterable[KeyValue[String, String]]] {
- override def init(context: ProcessorContext): Unit = {}
-
- override def transform(key: String, value: String):
Iterable[KeyValue[String, String]] =
- Array(new KeyValue(s"$key-transformed", s"$value-transformed"))
-
- override def close(): Unit = {}
- }
- val builder = new StreamsBuilder()
- val sourceTopic = "source"
- val sinkTopic = "sink"
-
- val stream = builder.stream[String, String](sourceTopic)
- stream
- .flatTransform(() => new TestTransformer)
- .to(sinkTopic)
-
- val now = Instant.now()
- val testDriver = createTestDriver(builder, now)
- val testInput = testDriver.createInput[String, String](sourceTopic)
- val testOutput = testDriver.createOutput[String, String](sinkTopic)
-
- testInput.pipeInput("1", "value", now)
-
- val result = testOutput.readKeyValue()
- assertEquals("value-transformed", result.value)
- assertEquals("1-transformed", result.key)
-
- assertTrue(testOutput.isEmpty)
-
- testDriver.close()
- }
-
@nowarn
@Test
def testCorrectlyFlatTransformValuesInRecords(): Unit = {