This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 866f0cc3080 KAFKA-16339: [3/4 KStream#transformValues] Remove
Deprecated "transformer" methods and classes (#17266)
866f0cc3080 is described below
commit 866f0cc30808c7150f82d7a08d1f37611ed28ef9
Author: Joao Pedro Fonseca Dantas <[email protected]>
AuthorDate: Fri Nov 22 20:07:03 2024 -0300
KAFKA-16339: [3/4 KStream#transformValues] Remove Deprecated "transformer"
methods and classes (#17266)
Reviewers: Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/kstream/KStream.java | 511 ++-------------------
.../kafka/streams/kstream/TransformerSupplier.java | 1 -
.../kafka/streams/kstream/ValueTransformer.java | 5 +-
.../streams/kstream/ValueTransformerSupplier.java | 2 -
.../streams/kstream/ValueTransformerWithKey.java | 5 +-
.../kstream/ValueTransformerWithKeySupplier.java | 2 -
.../streams/kstream/internals/KStreamImpl.java | 73 ---
.../kstream/internals/KStreamTransformValues.java | 74 ---
.../streams/processor/ConnectedStoreProvider.java | 6 +-
.../apache/kafka/streams/StreamsBuilderTest.java | 18 -
.../streams/kstream/internals/KStreamImplTest.java | 295 +-----------
.../internals/KStreamTransformValuesTest.java | 154 -------
.../kafka/streams/scala/kstream/KStream.scala | 88 ----
13 files changed, 53 insertions(+), 1181 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 241df1cd3ec..5c8c5ef092d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -48,8 +48,8 @@ import org.apache.kafka.streams.state.StoreBuilder;
* A {@code KStream} can be transformed record by record, joined with another
{@code KStream}, {@link KTable},
* {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
* Kafka Streams DSL can be mixed-and-matched with Processor API (PAPI) (c.f.
{@link Topology}) via
- * {@link #process(ProcessorSupplier, String...) process(...)} and {@link
#transformValues(ValueTransformerSupplier,
- * String...) transformValues(...)}.
+ * {@link #process(ProcessorSupplier, String...) process(...)} and {@link
#processValues(FixedKeyProcessorSupplier,
+ * String...) processValues(...)}.
*
* @param <K> Type of keys
* @param <V> Type of values
@@ -206,8 +206,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ?
extends KeyValue<? extends KR, ? extends VR>> mapper);
@@ -245,8 +244,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<KR, VR> KStream<KR, VR> map(final KeyValueMapper<? super K, ? super V, ?
extends KeyValue<? extends KR, ? extends VR>> mapper,
final Named named);
@@ -256,7 +254,7 @@ public interface KStream<K, V> {
* The provided {@link ValueMapper} is applied to each input record value
and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerSupplier, String...)} for
stateful value transformation).
+ * {@link #processValues(FixedKeyProcessorSupplier, String...)} for
stateful value processing).
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
@@ -280,8 +278,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR>
mapper);
@@ -290,7 +287,7 @@ public interface KStream<K, V> {
* The provided {@link ValueMapper} is applied to each input record value
and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerSupplier, String...)} for
stateful value transformation).
+ * {@link #processValues(FixedKeyProcessorSupplier, String...)} for
stateful value processing).
* <p>
* The example below counts the number of token of the value string.
* <pre>{@code
@@ -315,8 +312,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<VR> KStream<K, VR> mapValues(final ValueMapper<? super V, ? extends VR>
mapper,
final Named named);
@@ -326,7 +322,7 @@ public interface KStream<K, V> {
* The provided {@link ValueMapperWithKey} is applied to each input record
value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
for stateful value transformation).
+ * {@link #processValues(FixedKeyProcessorSupplier, String...)} for
stateful value processing).
* <p>
* The example below counts the number of tokens of key and value strings.
* <pre>{@code
@@ -351,8 +347,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super
V, ? extends VR> mapper);
@@ -361,7 +356,7 @@ public interface KStream<K, V> {
* The provided {@link ValueMapperWithKey} is applied to each input record
value and computes a new value for it.
* Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
* This is a stateless record-by-record operation (cf.
- * {@link #transformValues(ValueTransformerWithKeySupplier, String...)}
for stateful value transformation).
+ * {@link #processValues(FixedKeyProcessorSupplier, String...)} for
stateful value processing).
* <p>
* The example below counts the number of tokens of key and value strings.
* <pre>{@code
@@ -387,8 +382,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
*/
<VR> KStream<K, VR> mapValues(final ValueMapperWithKey<? super K, ? super
V, ? extends VR> mapper,
final Named named);
@@ -436,8 +430,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
@@ -487,8 +480,7 @@ public interface KStream<K, V> {
* @see #flatMapValues(ValueMapper)
* @see #flatMapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
@@ -502,8 +494,8 @@ public interface KStream<K, V> {
* stream (value type can be altered arbitrarily).
* The provided {@link ValueMapper} is applied to each input record and
computes zero or more output values.
* Thus, an input record {@code <K,V>} can be transformed into output
records {@code <K:V'>, <K:V''>, ...}.
- * This is a stateless record-by-record operation (cf. {@link
#transformValues(ValueTransformerSupplier, String...)}
- * for stateful value transformation).
+ * This is a stateless record-by-record operation (cf. {@link
#processValues(FixedKeyProcessorSupplier, String...)}
+ * for stateful value processing).
* <p>
* The example below splits input records {@code <null:String>} containing
sentences as values into their words.
* <pre>{@code
@@ -530,8 +522,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
@@ -544,8 +535,8 @@ public interface KStream<K, V> {
* stream (value type can be altered arbitrarily).
* The provided {@link ValueMapper} is applied to each input record and
computes zero or more output values.
* Thus, an input record {@code <K,V>} can be transformed into output
records {@code <K:V'>, <K:V''>, ...}.
- * This is a stateless record-by-record operation (cf. {@link
#transformValues(ValueTransformerSupplier, String...)}
- * for stateful value transformation).
+ * This is a stateless record-by-record operation (cf. {@link
#processValues(FixedKeyProcessorSupplier, String...)}
+ * for stateful value processing).
* <p>
* The example below splits input records {@code <null:String>} containing
sentences as values into their words.
* <pre>{@code
@@ -573,8 +564,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
@@ -587,8 +577,8 @@ public interface KStream<K, V> {
* stream (value type can be altered arbitrarily).
* The provided {@link ValueMapperWithKey} is applied to each input record
and computes zero or more output values.
* Thus, an input record {@code <K,V>} can be transformed into output
records {@code <K:V'>, <K:V''>, ...}.
- * This is a stateless record-by-record operation (cf. {@link
#transformValues(ValueTransformerWithKeySupplier, String...)}
- * for stateful value transformation).
+ * This is a stateless record-by-record operation (cf. {@link
#processValues(FixedKeyProcessorSupplier, String...)}
+ * for stateful value processing).
* <p>
* The example below splits input records {@code <Integer:String>}, with
key=1, containing sentences as values
* into their words.
@@ -621,8 +611,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
@@ -635,8 +624,8 @@ public interface KStream<K, V> {
* stream (value type can be altered arbitrarily).
* The provided {@link ValueMapperWithKey} is applied to each input record
and computes zero or more output values.
* Thus, an input record {@code <K,V>} can be transformed into output
records {@code <K:V'>, <K:V''>, ...}.
- * This is a stateless record-by-record operation (cf. {@link
#transformValues(ValueTransformerWithKeySupplier, String...)}
- * for stateful value transformation).
+ * This is a stateless record-by-record operation (cf. {@link
#processValues(FixedKeyProcessorSupplier, String...)}
+ * for stateful value processing).
* <p>
* The example below splits input records {@code <Integer:String>}, with
key=1, containing sentences as values
* into their words.
@@ -670,8 +659,7 @@ public interface KStream<K, V> {
* @see #mapValues(ValueMapper)
* @see #mapValues(ValueMapperWithKey)
* @see #process(ProcessorSupplier, String...)
- * @see #transformValues(ValueTransformerSupplier, String...)
- * @see #transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see #processValues(FixedKeyProcessorSupplier, String...)
* @see #flatTransformValues(ValueTransformerSupplier, String...)
* @see #flatTransformValues(ValueTransformerWithKeySupplier, String...)
*/
@@ -2984,453 +2972,6 @@ public interface KStream<K, V> {
final KeyValueMapper<? super K, ?
super V, ? extends GK> keySelector,
final ValueJoinerWithKey<? super K, ?
super V, ? super GV, ? extends RV> valueJoiner,
final Named named);
-
- /**
- * Transform the value of each input record into a new value (with
possibly a new type) of the output record.
- * A {@link ValueTransformer} (provided by the given {@link
ValueTransformerSupplier}) is applied to each input
- * record value and computes a new value for it.
- * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
- * Attaching a state store makes this a stateful record-by-record
operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
- * If you choose not to attach one, this operation is similar to the
stateless {@link #mapValues(ValueMapper) mapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * Furthermore, via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing
progress
- * can be observed and additional periodic actions can be performed.
- * <p>
- * In order for the transformer to use state stores, the stores must be
added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not
required to connect global state stores; read-only
- * access to global state stores is available by default).
- * <p>
- * The first strategy is to manually add the {@link StoreBuilder}s via
{@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be
connected to the transformer.
- * <pre>{@code
- * // create store
- * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.transformValues(new
ValueTransformerSupplier() {
- * public ValueTransformer get() {
- * return new MyValueTransformer();
- * }
- * }, "myValueTransformState");
- * }</pre>
- * The second strategy is for the given {@link ValueTransformerSupplier}
to implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to
the topology and connected to the transformer.
- * <pre>{@code
- * class MyValueTransformerSupplier implements ValueTransformerSupplier {
- * // supply transformer
- * ValueTransformer get() {
- * return new MyValueTransformer();
- * }
- *
- * // provide store(s) that will be added and connected to the
associated transformer
- * // the store name from the builder ("myValueTransformState") is
used to access the store later via the ProcessorContext
- * Set<StoreBuilder> stores() {
- * StoreBuilder<KeyValueStore<String, String>>
keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.transformValues(new
MyValueTransformerSupplier());
- * }</pre>
- * <p>
- * With either strategy, within the {@link ValueTransformer}, the state is
obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformer} must return the new value in {@link
ValueTransformer#transform(Object) transform()}.
- * No additional {@link KeyValue} pairs can be emitted via
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformer} tries to
- * emit a {@link KeyValue} pair.
- * <pre>{@code
- * class MyValueTransformer implements ValueTransformer {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1),
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * NewValueType transform(V value) {
- * // can access this.state
- * return new NewValueType(); // or null
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }</pre>
- * Even if any upstream operation was key-changing, no auto-repartition is
triggered.
- * If repartitioning is required, a call to {@link #repartition()} should
be performed before
- * {@code transformValues()}.
- * <p>
- * Setting a new value preserves data co-location with respect to the key.
- * Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link
ValueTransformerSupplier} that generates a newly constructed {@link
ValueTransformer}
- * The supplier should always generate a
new instance. Creating a single {@link ValueTransformer} object
- * and returning the same object reference
in {@link ValueTransformer} is a
- * violation of the supplier pattern and
leads to runtime exceptions.
- * @param stateStoreNames the names of the state stores used by
the processor; not required if the supplier
- * implements {@link
ConnectedStoreProvider#stores()}
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and
new values (possibly of different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
- */
- @Deprecated
- <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super
V, ? extends VR> valueTransformerSupplier,
- final String... stateStoreNames);
- /**
- * Transform the value of each input record into a new value (with
possibly a new type) of the output record.
- * A {@link ValueTransformer} (provided by the given {@link
ValueTransformerSupplier}) is applied to each input
- * record value and computes a new value for it.
- * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
- * Attaching a state store makes this a stateful record-by-record
operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
- * If you choose not to attach one, this operation is similar to the
stateless {@link #mapValues(ValueMapper) mapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * This is essentially mixing the Processor API into the DSL, and provides
all the functionality of the PAPI.
- * Furthermore, via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing
progress
- * can be observed and additional periodic actions can be performed.
- * <p>
- * In order for the transformer to use state stores, the stores must be
added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not
required to connect global state stores; read-only
- * access to global state stores is available by default).
- * <p>
- * The first strategy is to manually add the {@link StoreBuilder}s via
{@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be
connected to the transformer.
- * <pre>{@code
- * // create store
- * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.transformValues(new
ValueTransformerSupplier() {
- * public ValueTransformer get() {
- * return new MyValueTransformer();
- * }
- * }, "myValueTransformState");
- * }</pre>
- * The second strategy is for the given {@link ValueTransformerSupplier}
to implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to
the topology and connected to the transformer.
- * <pre>{@code
- * class MyValueTransformerSupplier implements ValueTransformerSupplier {
- * // supply transformer
- * ValueTransformer get() {
- * return new MyValueTransformer();
- * }
- *
- * // provide store(s) that will be added and connected to the
associated transformer
- * // the store name from the builder ("myValueTransformState") is
used to access the store later via the ProcessorContext
- * Set<StoreBuilder> stores() {
- * StoreBuilder<KeyValueStore<String, String>>
keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.transformValues(new
MyValueTransformerSupplier());
- * }</pre>
- * <p>
- * With either strategy, within the {@link ValueTransformer}, the state is
obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformer} must return the new value in {@link
ValueTransformer#transform(Object) transform()}.
- * No additional {@link KeyValue} pairs can be emitted via
- * pairs can be emitted via
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformer} tries to
- * emit a {@link KeyValue} pair.
- * <pre>{@code
- * class MyValueTransformer implements ValueTransformer {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1),
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * NewValueType transform(V value) {
- * // can access this.state
- * return new NewValueType(); // or null
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }</pre>
- * Even if any upstream operation was key-changing, no auto-repartition is
triggered.
- * If repartitioning is required, a call to {@link #repartition()} should
be performed before
- * {@code transformValues()}.
- * <p>
- * Setting a new value preserves data co-location with respect to the key.
- * Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link
ValueTransformerSupplier} that generates a newly constructed {@link
ValueTransformer}
- * The supplier should always generate a
new instance. Creating a single {@link ValueTransformer} object
- * and returning the same object reference
in {@link ValueTransformer} is a
- * violation of the supplier pattern and
leads to runtime exceptions.
- * @param named a {@link Named} config used to name the
processor in the topology
- * @param stateStoreNames the names of the state stores used by
the processor; not required if the supplier
- * implements {@link
ConnectedStoreProvider#stores()}
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and
new values (possibly of different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
- */
- @Deprecated
- <VR> KStream<K, VR> transformValues(final ValueTransformerSupplier<? super
V, ? extends VR> valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames);
-
- /**
- * Transform the value of each input record into a new value (with
possibly a new type) of the output record.
- * A {@link ValueTransformerWithKey} (provided by the given {@link
ValueTransformerWithKeySupplier}) is applied to
- * each input record value and computes a new value for it.
- * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
- * Attaching a state store makes this a stateful record-by-record
operation (cf. {@link #mapValues(ValueMapperWithKey) mapValues()}).
- * If you choose not to attach one, this operation is similar to the
stateless {@link #mapValues(ValueMapperWithKey) mapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * This is essentially mixing the Processor API into the DSL, and provides
all the functionality of the PAPI.
- * Furthermore, via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing
progress
- * can be observed and additional periodic actions can be performed.
- * <p>
- * In order for the transformer to use state stores, the stores must be
added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not
required to connect global state stores; read-only
- * access to global state stores is available by default).
- * <p>
- * The first strategy is to manually add the {@link StoreBuilder}s via
{@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be
connected to the transformer.
- * <pre>{@code
- * // create store
- * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.transformValues(new
ValueTransformerWithKeySupplier() {
- * public ValueTransformer get() {
- * return new MyValueTransformer();
- * }
- * }, "myValueTransformState");
- * }</pre>
- * The second strategy is for the given {@link
ValueTransformerWithKeySupplier} to implement {@link
ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to
the topology and connected to the transformer.
- * <pre>{@code
- * class MyValueTransformerWithKeySupplier implements
ValueTransformerWithKeySupplier {
- * // supply transformer
- * ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- *
- * // provide store(s) that will be added and connected to the
associated transformer
- * // the store name from the builder ("myValueTransformState") is
used to access the store later via the ProcessorContext
- * Set<StoreBuilder> stores() {
- * StoreBuilder<KeyValueStore<String, String>>
keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.transformValues(new
MyValueTransformerWithKeySupplier());
- * }</pre>
- * <p>
- * With either strategy, within the {@link ValueTransformerWithKey}, the
state is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformerWithKey} must return the new value in
- * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
- * No additional {@link KeyValue} pairs can be emitted via
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformerWithKey} tries
- * to emit a {@link KeyValue} pair.
- * <pre>{@code
- * class MyValueTransformerWithKey implements ValueTransformerWithKey {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1),
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * NewValueType transform(K readOnlyKey, V value) {
- * // can access this.state and use read-only key
- * return new NewValueType(readOnlyKey); // or null
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }</pre>
- * Even if any upstream operation was key-changing, no auto-repartition is
triggered.
- * If repartitioning is required, a call to {@link #repartition()} should
be performed before
- * {@code transformValues()}.
- * <p>
- * Note that the key is read-only and should not be modified, as this can
lead to corrupt partitioning.
- * So, setting a new value preserves data co-location with respect to the
key.
- * Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link
ValueTransformerWithKeySupplier} that generates a newly constructed {@link
ValueTransformerWithKey}
- * The supplier should always generate a
new instance. Creating a single {@link ValueTransformerWithKey} object
- * and returning the same object reference
in {@link ValueTransformerWithKey} is a
- * violation of the supplier pattern and
leads to runtime exceptions.
- * @param stateStoreNames the names of the state stores used by
the processor; not required if the supplier
- * implements {@link
ConnectedStoreProvider#stores()}
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and
new values (possibly of different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
- */
- @Deprecated
- <VR> KStream<K, VR> transformValues(final
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR>
valueTransformerSupplier,
- final String... stateStoreNames);
-
- /**
- * Transform the value of each input record into a new value (with
possibly a new type) of the output record.
- * A {@link ValueTransformerWithKey} (provided by the given {@link
ValueTransformerWithKeySupplier}) is applied to
- * each input record value and computes a new value for it.
- * Thus, an input record {@code <K,V>} can be transformed into an output
record {@code <K:V'>}.
- * Attaching a state store makes this a stateful record-by-record
operation (cf. {@link #mapValues(ValueMapperWithKey) mapValues()}).
- * If you choose not to attach one, this operation is similar to the
stateless {@link #mapValues(ValueMapperWithKey) mapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * This is essentially mixing the Processor API into the DSL, and provides
all the functionality of the PAPI.
- * Furthermore, via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing
progress
- * can be observed and additional periodic actions can be performed.
- * <p>
- * In order for the transformer to use state stores, the stores must be
added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not
required to connect global state stores; read-only
- * access to global state stores is available by default).
- * <p>
- * The first strategy is to manually add the {@link StoreBuilder}s via
{@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be
connected to the transformer.
- * <pre>{@code
- * // create store
- * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.transformValues(new
ValueTransformerWithKeySupplier() {
- * public ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- * }, "myValueTransformState");
- * }</pre>
- * The second strategy is for the given {@link
ValueTransformerWithKeySupplier} to implement {@link
ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to
the topology and connected to the transformer.
- * <pre>{@code
- * class MyValueTransformerWithKeySupplier implements
ValueTransformerWithKeySupplier {
- * // supply transformer
- * ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- *
- * // provide store(s) that will be added and connected to the
associated transformer
- * // the store name from the builder ("myValueTransformState") is
used to access the store later via the ProcessorContext
- * Set<StoreBuilder> stores() {
- * StoreBuilder<KeyValueStore<String, String>>
keyValueStoreBuilder =
- *
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.transformValues(new
MyValueTransformerWithKeySupplier());
- * }</pre>
- * <p>
- * With either strategy, within the {@link ValueTransformerWithKey}, the
state is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformerWithKey} must return the new value in
- * {@link ValueTransformerWithKey#transform(Object, Object) transform()}.
- * No additional {@link KeyValue} pairs can be emitted via
- * {@link
org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object)
ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if
the {@link ValueTransformerWithKey} tries
- * to emit a {@link KeyValue} pair.
- * <pre>{@code
- * class MyValueTransformerWithKey implements ValueTransformerWithKey {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1),
PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * NewValueType transform(K readOnlyKey, V value) {
- * // can access this.state and use read-only key
- * return new NewValueType(readOnlyKey); // or null
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }</pre>
- * Even if any upstream operation was key-changing, no auto-repartition is
triggered.
- * If repartitioning is required, a call to {@link #repartition()} should
be performed before
- * {@code transformValues()}.
- * <p>
- * Note that the key is read-only and should not be modified, as this can
lead to corrupt partitioning.
- * So, setting a new value preserves data co-location with respect to the
key.
- * Thus, <em>no</em> internal data redistribution is required if a key
based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link
ValueTransformerWithKeySupplier} that generates a newly constructed {@link
ValueTransformerWithKey}
- * The supplier should always generate a
new instance. Creating a single {@link ValueTransformerWithKey} object
- * and returning the same object reference
in {@link ValueTransformerWithKey} is a
- * violation of the supplier pattern and
leads to runtime exceptions.
- * @param named a {@link Named} config used to name the
processor in the topology
- * @param stateStoreNames the names of the state stores used by
the processor; not required if the supplier
- * implements {@link
ConnectedStoreProvider#stores()}
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains records with unmodified key and
new values (possibly of different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link
KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
- */
- @Deprecated
- <VR> KStream<K, VR> transformValues(final
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR>
valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames);
/**
* Transform the value of each input record into zero or more new values
(with possibly a new
* type) and emit for each new value a record with the same key of the
input record and the value.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
index 228b1d71231..222cdc1bbc2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
@@ -35,7 +35,6 @@ import java.util.function.Supplier;
* @see Transformer
* @see ValueTransformer
* @see ValueTransformerSupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
* @deprecated Since 4.0. Use {@link
org.apache.kafka.streams.processor.api.ProcessorSupplier api.ProcessorSupplier}
instead.
*/
@Deprecated
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 6d4e4fe8f1c..ae1d21334ca 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -44,8 +44,7 @@ import java.time.Duration;
* @param <VR> transformed value type
* @see ValueTransformerSupplier
* @see ValueTransformerWithKeySupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see KTable#transformValues(ValueTransformerWithKeySupplier, Materialized,
String...)
* @see Transformer
* @deprecated Since 4.0. Use {@link FixedKeyProcessor} instead.
*/
@@ -77,7 +76,7 @@ public interface ValueTransformer<V, VR> {
/**
* Transform the given value to a new value.
- * Additionally, any {@link StateStore} that is {@link
KStream#transformValues(ValueTransformerSupplier, String...)
+ * Additionally, any {@link StateStore} that is {@link
KTable#transformValues(ValueTransformerWithKeySupplier, String...)
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
index b0008744eac..6a4c25b0c1c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
@@ -31,8 +31,6 @@ import
org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
* @see ValueTransformer
* @see ValueTransformerWithKey
* @see ValueTransformerWithKeySupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
* @deprecated Since 4.0. Use {@link FixedKeyProcessorSupplier} instead.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
index 9c3552622ad..cc0c38d01ef 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKey.java
@@ -47,8 +47,7 @@ import java.time.Duration;
* @param <VR> transformed value type
* @see ValueTransformer
* @see ValueTransformerWithKeySupplier
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
+ * @see KTable#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
*/
@@ -77,7 +76,7 @@ public interface ValueTransformerWithKey<K, V, VR> {
/**
* Transform the given [key and] value to a new value.
- * Additionally, any {@link StateStore} that is {@link
KStream#transformValues(ValueTransformerWithKeySupplier, String...)
+ * Additionally, any {@link StateStore} that is {@link
KTable#transformValues(ValueTransformerWithKeySupplier, Named, String...)
* attached} to this operator can be accessed and modified arbitrarily (cf.
* {@link ProcessorContext#getStateStore(String)}).
* <p>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
index 1c0feb0015e..8b1e995f1c3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerWithKeySupplier.java
@@ -32,8 +32,6 @@ import java.util.function.Supplier;
* @param <VR> transformed value type
* @see ValueTransformer
* @see ValueTransformerWithKey
- * @see KStream#transformValues(ValueTransformerSupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
* @see Transformer
* @see TransformerSupplier
*/
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index a23c5ad4b0b..ec2fd211efb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -121,8 +121,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V>
implements KStream<K
private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
- private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
-
private static final String TRANSFORMVALUES_NAME =
"KSTREAM-TRANSFORMVALUES-";
private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
@@ -1209,77 +1207,6 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
builder);
}
- @Override
- @Deprecated
- public <VR> KStream<K, VR> transformValues(final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends
VR> valueTransformerSupplier,
- final String...
stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier,
"valueTransformerSupplier can't be null");
- return doTransformValues(
- toValueTransformerWithKeySupplier(valueTransformerSupplier),
- NamedInternal.empty(),
- stateStoreNames);
- }
-
- @Override
- @Deprecated
- public <VR> KStream<K, VR> transformValues(final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V, ? extends
VR> valueTransformerSupplier,
- final Named named,
- final String...
stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier,
"valueTransformerSupplier can't be null");
- Objects.requireNonNull(named, "named can't be null");
- return doTransformValues(
- toValueTransformerWithKeySupplier(valueTransformerSupplier),
- new NamedInternal(named),
- stateStoreNames);
- }
-
- @Override
- @Deprecated
- public <VR> KStream<K, VR> transformValues(final
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR>
valueTransformerSupplier,
- final String...
stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier,
"valueTransformerSupplier can't be null");
- return doTransformValues(valueTransformerSupplier,
NamedInternal.empty(), stateStoreNames);
- }
-
- @Override
- @Deprecated
- public <VR> KStream<K, VR> transformValues(final
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR>
valueTransformerSupplier,
- final Named named,
- final String...
stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier,
"valueTransformerSupplier can't be null");
- Objects.requireNonNull(named, "named can't be null");
- return doTransformValues(valueTransformerSupplier, new
NamedInternal(named), stateStoreNames);
- }
-
- private <VR> KStream<K, VR> doTransformValues(final
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR>
valueTransformerWithKeySupplier,
- final NamedInternal named,
- final String...
stateStoreNames) {
- Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a
null array");
- for (final String stateStoreName : stateStoreNames) {
- Objects.requireNonNull(stateStoreName, "stateStoreNames can't
contain `null` as store name");
- }
- ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
-
- final String name = named.orElseGenerateWithPrefix(builder,
TRANSFORMVALUES_NAME);
- final StatefulProcessorNode<? super K, ? super V> transformNode = new
StatefulProcessorNode<>(
- name,
- new ProcessorParameters<>(new
KStreamTransformValues<>(valueTransformerWithKeySupplier), name),
- stateStoreNames);
- transformNode.setValueChangingOperation(true);
-
- builder.addGraphNode(graphNode, transformNode);
-
- // cannot inherit value serde
- return new KStreamImpl<>(
- name,
- keySerde,
- null,
- subTopologySourceNodes,
- repartitionRequired,
- transformNode,
- builder);
- }
-
@Override
@Deprecated
public <VR> KStream<K, VR> flatTransformValues(final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<? super V,
Iterable<VR>> valueTransformerSupplier,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
deleted file mode 100644
index 1b767ef3969..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
-import
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.state.StoreBuilder;
-
-import java.util.Set;
-
-public class KStreamTransformValues<KIn, VIn, VOut> implements
ProcessorSupplier<KIn, VIn, KIn, VOut> {
-
- private final ValueTransformerWithKeySupplier<KIn, VIn, VOut>
valueTransformerSupplier;
-
- KStreamTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn,
VOut> valueTransformerSupplier) {
- this.valueTransformerSupplier = valueTransformerSupplier;
- }
-
- @Override
- public Processor<KIn, VIn, KIn, VOut> get() {
- return new
KStreamTransformValuesProcessor<>(valueTransformerSupplier.get());
- }
-
- @Override
- public Set<StoreBuilder<?>> stores() {
- return valueTransformerSupplier.stores();
- }
-
- public static class KStreamTransformValuesProcessor<KIn, VIn, VOut>
extends ContextualProcessor<KIn, VIn, KIn, VOut> {
-
- private final ValueTransformerWithKey<KIn, VIn, VOut> valueTransformer;
-
- KStreamTransformValuesProcessor(final ValueTransformerWithKey<KIn,
VIn, VOut> valueTransformer) {
- this.valueTransformer = valueTransformer;
- }
-
- @Override
- public void init(final ProcessorContext<KIn, VOut> context) {
- super.init(context);
- valueTransformer.init(new
ForwardingDisabledProcessorContext((InternalProcessorContext<KIn, VOut>)
context));
- }
-
- @Override
- public void process(final Record<KIn, VIn> record) {
-
context().forward(record.withValue(valueTransformer.transform(record.key(),
record.value())));
- }
-
- @Override
- public void close() {
- valueTransformer.close();
- }
- }
-}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
index 91824d5a5b8..108a7d7233b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Set;
@@ -91,10 +92,7 @@ import java.util.Set;
* @see Topology#addProcessor(String,
org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see
KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier,
String...)
* @see
KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier,
Named, String...)
- * @see
KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
String...)
- * @see
KStream#transformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
Named, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, String...)
- * @see KStream#transformValues(ValueTransformerWithKeySupplier, Named,
String...)
+ * @see KStream#processValues(FixedKeyProcessorSupplier, String...)
* @see
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
String...)
* @see
KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier,
Named, String...)
* @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index aa646e873cf..6f79aee9dbe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -845,24 +845,6 @@ public class StreamsBuilderTest {
assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000",
STREAM_OPERATION_NAME);
}
- @Test
- @SuppressWarnings("deprecation")
- public void shouldUseSpecifiedNameForTransformValues() {
- builder.stream(STREAM_TOPIC).transformValues(() -> new
NoopValueTransformer<>(), Named.as(STREAM_OPERATION_NAME));
- builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).buildTopology();
- assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000",
STREAM_OPERATION_NAME);
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void shouldUseSpecifiedNameForTransformValuesWithKey() {
- builder.stream(STREAM_TOPIC).transformValues(() -> new
NoopValueTransformerWithKey<>(), Named.as(STREAM_OPERATION_NAME));
- builder.build();
- final ProcessorTopology topology =
builder.internalTopologyBuilder.rewriteTopology(new
StreamsConfig(props)).buildTopology();
- assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000",
STREAM_OPERATION_NAME);
- }
-
@Test
public void shouldUseSpecifiedNameForSplitOperation() {
builder.stream(STREAM_TOPIC)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 226fc357a6a..b78696f259a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -114,33 +114,6 @@ public class KStreamImplTest {
private final MockApiProcessorSupplier<String, String, Void, Void>
processorSupplier = new MockApiProcessorSupplier<>();
private final MockApiFixedKeyProcessorSupplier<String, String, Void>
fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
@SuppressWarnings("deprecation")
- private final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<String, String>
valueTransformerSupplier =
- () -> new org.apache.kafka.streams.kstream.ValueTransformer<String,
String>() {
- @Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public String transform(final String value) {
- return value;
- }
-
- @Override
- public void close() {}
- };
- private final ValueTransformerWithKeySupplier<String, String, String>
valueTransformerWithKeySupplier =
- () -> new ValueTransformerWithKey<String, String, String>() {
- @Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public String transform(final String key, final String value) {
- return value;
- }
-
- @Override
- public void close() {}
- };
- @SuppressWarnings("deprecation")
private final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<String,
Iterable<String>> flatValueTransformerSupplier =
() -> new org.apache.kafka.streams.kstream.ValueTransformer<String,
Iterable<String>>() {
@Override
@@ -1213,9 +1186,6 @@ public class KStreamImplTest {
assertEquals(((AbstractStream)
stream1.flatMapValues(flatMapper)).keySerde(), consumedInternal.keySerde());
assertNull(((AbstractStream)
stream1.flatMapValues(flatMapper)).valueSerde());
- assertEquals(((AbstractStream)
stream1.transformValues(valueTransformerSupplier)).keySerde(),
consumedInternal.keySerde());
- assertNull(((AbstractStream)
stream1.transformValues(valueTransformerSupplier)).valueSerde());
-
assertNull(((AbstractStream) stream1.merge(stream1)).keySerde());
assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde());
@@ -1589,7 +1559,7 @@ public class KStreamImplTest {
processorSupplier.get();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
- () -> testStream.process(() -> processor,
Named.as("flatTransformer"))
+ () -> testStream.process(() -> processor, Named.as("processor"))
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
@@ -1606,272 +1576,49 @@ public class KStreamImplTest {
}
@Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowBadTransformerSupplierOnTransformValues() {
- final org.apache.kafka.streams.kstream.ValueTransformer<String,
String> transformer = valueTransformerSupplier.get();
+ public void shouldNotAllowBadProcessSupplierOnProcessValues() {
+ final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String,
String, Void> processor =
+ fixedKeyProcessorSupplier.get();
final IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () -> testStream.transformValues(() -> transformer)
+ IllegalArgumentException.class,
+ () -> testStream.processValues(() -> processor)
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
@Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowBadTransformerSupplierOnTransformValuesWithNamed() {
- final org.apache.kafka.streams.kstream.ValueTransformer<String,
String> transformer = valueTransformerSupplier.get();
+ public void shouldNotAllowBadProcessSupplierOnProcessValuesWithStores() {
+ final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String,
String, Void> processor =
+ fixedKeyProcessorSupplier.get();
final IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () -> testStream.transformValues(() -> transformer,
Named.as("transformer"))
+ IllegalArgumentException.class,
+ () -> testStream.processValues(() -> processor, "storeName")
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
@Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullValueTransformerSupplierOnTransformValues() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () ->
testStream.transformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object,
Object>) null));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValues() {
- final ValueTransformerWithKey<String, String, String> transformer =
valueTransformerWithKeySupplier.get();
+ public void shouldNotAllowBadProcessSupplierOnProcessValuesWithNamed() {
+ final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String,
String, Void> processor =
+ fixedKeyProcessorSupplier.get();
final IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () -> testStream.transformValues(() -> transformer)
+ IllegalArgumentException.class,
+ () -> testStream.processValues(() -> processor,
Named.as("processor"))
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
@Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowBadValueTransformerWithKeySupplierOnTransformValuesWithNamed() {
- final ValueTransformerWithKey<String, String, String> transformer =
valueTransformerWithKeySupplier.get();
+ public void
shouldNotAllowBadProcessSupplierOnProcessValuesWithNamedAndStores() {
+ final org.apache.kafka.streams.processor.api.FixedKeyProcessor<String,
String, Void> processor =
+ fixedKeyProcessorSupplier.get();
final IllegalArgumentException exception = assertThrows(
- IllegalArgumentException.class,
- () -> testStream.transformValues(() -> transformer,
Named.as("transformer"))
+ IllegalArgumentException.class,
+ () -> testStream.processValues(() -> processor,
Named.as("processor"), "storeName")
);
assertThat(exception.getMessage(), containsString("#get() must return
a new object each time it is called."));
}
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValues() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () ->
testStream.transformValues((ValueTransformerWithKeySupplier<Object, Object,
Object>) null));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithStores() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
-
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>)
null,
- "storeName"));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithStores() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- (ValueTransformerWithKeySupplier<Object, Object, Object>) null,
- "storeName"));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamed() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
-
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>)
null,
- Named.as("valueTransformer")));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamed() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- (ValueTransformerWithKeySupplier<Object, Object, Object>) null,
- Named.as("valueTransformerWithKey")));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullValueTransformerSupplierOnTransformValuesWithNamedAndStores()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
-
(org.apache.kafka.streams.kstream.ValueTransformerSupplier<Object, Object>)
null,
- Named.as("valueTransformer"),
- "storeName"));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullValueTransformerWithKeySupplierOnTransformValuesWithNamedAndStores()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- (ValueTransformerWithKeySupplier<Object, Object, Object>) null,
- Named.as("valueTransformerWithKey"),
- "storeName"));
- assertThat(exception.getMessage(), equalTo("valueTransformerSupplier
can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplier() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerSupplier,
- (String[]) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a
null array"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplier()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerWithKeySupplier,
- (String[]) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a
null array"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplier() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerSupplier, (String) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't
contain `null` as store name"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplier()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerWithKeySupplier,
- (String) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't
contain `null` as store name"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerSupplierWithNamed()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerSupplier,
- Named.as("valueTransformer"),
- (String[]) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a
null array"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNamesOnTransformValuesWithValueTransformerWithKeySupplierWithNamed()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerWithKeySupplier,
- Named.as("valueTransformer"),
- (String[]) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't be a
null array"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerSupplierWithNamed()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerSupplier,
- Named.as("valueTransformer"),
- (String) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't
contain `null` as store name"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullStoreNameOnTransformValuesWithValueTransformerWithKeySupplierWithName()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerWithKeySupplier,
- Named.as("valueTransformerWithKey"),
- (String) null));
- assertThat(exception.getMessage(), equalTo("stateStoreNames can't
contain `null` as store name"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplier() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerSupplier,
- (Named) null));
- assertThat(exception.getMessage(), equalTo("named can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplier() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerWithKeySupplier,
- (Named) null));
- assertThat(exception.getMessage(), equalTo("named can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerSupplierAndStores()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerSupplier,
- (Named) null,
- "storeName"));
- assertThat(exception.getMessage(), equalTo("named can't be null"));
- }
-
- @Test
- @SuppressWarnings("deprecation")
- public void
shouldNotAllowNullNamedOnTransformValuesWithValueTransformerWithKeySupplierAndStores()
{
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.transformValues(
- valueTransformerWithKeySupplier,
- (Named) null,
- "storeName"));
- assertThat(exception.getMessage(), equalTo("named can't be null"));
- }
-
@Test
@SuppressWarnings("deprecation")
public void
shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
deleted file mode 100644
index 5cb51ae7beb..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValueTimestamp;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.api.Processor;
-import
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.NoOpValueTransformerWithKeySupplier;
-import org.apache.kafka.test.StreamsTestUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import java.util.Properties;
-
-import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.mockito.Mockito.mock;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class KStreamTransformValuesTest {
- private final String topicName = "topic";
- private final MockProcessorSupplier<Integer, Integer, Void, Void> supplier
= new MockProcessorSupplier<>();
- private final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.Integer());
- private InternalProcessorContext context =
mock(InternalProcessorContext.class);
-
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- @Test
- public void testTransform() {
- final StreamsBuilder builder = new StreamsBuilder();
-
- final
org.apache.kafka.streams.kstream.ValueTransformerSupplier<Number, Integer>
valueTransformerSupplier =
- () -> new
org.apache.kafka.streams.kstream.ValueTransformer<Number, Integer>() {
- private int total = 0;
-
- @Override
- public void init(final
org.apache.kafka.streams.processor.ProcessorContext context) { }
-
- @Override
- public Integer transform(final Number value) {
- total += value.intValue();
- return total;
- }
-
- @Override
- public void close() { }
- };
-
- final int[] expectedKeys = {1, 10, 100, 1000};
-
- final KStream<Integer, Integer> stream;
- stream = builder.stream(topicName, Consumed.with(Serdes.Integer(),
Serdes.Integer()));
- stream.transformValues(valueTransformerSupplier).process(supplier);
-
- try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
- for (final int expectedKey : expectedKeys) {
- final TestInputTopic<Integer, Integer> inputTopic =
- driver.createInputTopic(topicName, new
IntegerSerializer(), new IntegerSerializer());
- inputTopic.pipeInput(expectedKey, expectedKey * 10,
expectedKey / 2L);
- }
- }
- final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 10,
0),
- new KeyValueTimestamp<>(10, 110, 5),
- new KeyValueTimestamp<>(100, 1110, 50),
- new KeyValueTimestamp<>(1000, 11110, 500)};
-
- assertArrayEquals(expected,
supplier.theCapturedProcessor().processed().toArray());
- }
-
- @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
- @Test
- public void testTransformWithKey() {
- final StreamsBuilder builder = new StreamsBuilder();
-
- final ValueTransformerWithKeySupplier<Integer, Number, Integer>
valueTransformerSupplier =
- () -> new ValueTransformerWithKey<Integer, Number, Integer>() {
- private int total = 0;
-
- @Override
- public void init(final
org.apache.kafka.streams.processor.ProcessorContext context) { }
-
- @Override
- public Integer transform(final Integer readOnlyKey, final
Number value) {
- total += value.intValue() + readOnlyKey;
- return total;
- }
-
- @Override
- public void close() { }
- };
-
- final int[] expectedKeys = {1, 10, 100, 1000};
-
- final KStream<Integer, Integer> stream;
- stream = builder.stream(topicName, Consumed.with(Serdes.Integer(),
Serdes.Integer()));
- stream.transformValues(valueTransformerSupplier).process(supplier);
-
- try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
- final TestInputTopic<Integer, Integer> inputTopic =
- driver.createInputTopic(topicName, new
IntegerSerializer(), new IntegerSerializer());
- for (final int expectedKey : expectedKeys) {
- inputTopic.pipeInput(expectedKey, expectedKey * 10,
expectedKey / 2L);
- }
- }
- final KeyValueTimestamp[] expected = {new KeyValueTimestamp<>(1, 11,
0),
- new KeyValueTimestamp<>(10, 121, 5),
- new KeyValueTimestamp<>(100, 1221, 50),
- new KeyValueTimestamp<>(1000, 12221, 500)};
-
- assertArrayEquals(expected,
supplier.theCapturedProcessor().processed().toArray());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void
shouldInitializeTransformerWithForwardDisabledProcessorContext() {
- final NoOpValueTransformerWithKeySupplier<String, String> transformer
= new NoOpValueTransformerWithKeySupplier<>();
- final KStreamTransformValues<String, String, String> transformValues =
new KStreamTransformValues<>(transformer);
- final Processor<String, String, String, String> processor =
transformValues.get();
-
- processor.init(context);
-
- assertThat(transformer.context, isA((Class)
ForwardingDisabledProcessorContext.class));
- }
-}
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 5e6cc4f3f22..80d43fc2315 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -584,94 +584,6 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
): KStream[K, VR] =
new KStream(inner.flatTransformValues[VR](valueTransformerSupplier.asJava,
named, stateStoreNames: _*))
- /**
- * Transform the value of each input record into a new value (with possible
new type) of the output record.
- * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
- * record value and computes a new value for it.
- * In order to assign a state, the state must be created and added via
`addStateStore` before they can be connected
- * to the `ValueTransformer`.
- * It's not required to connect global state stores that are added via
`addGlobalStore`;
- * read-only access to global state stores is available by default.
- *
- * @param valueTransformerSupplier a instance of `ValueTransformerSupplier`
that generates a `ValueTransformer`
- * @param stateStoreNames the names of the state stores used by the
processor
- * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
- @deprecated(since = "3.3", message = "Use
processValues(FixedKeyProcessorSupplier, String*) instead.")
- def transformValues[VR](
- valueTransformerSupplier: ValueTransformerSupplier[V, VR],
- stateStoreNames: String*
- ): KStream[K, VR] =
- new KStream(inner.transformValues[VR](valueTransformerSupplier,
stateStoreNames: _*))
-
- /**
- * Transform the value of each input record into a new value (with possible
new type) of the output record.
- * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
- * record value and computes a new value for it.
- * In order to assign a state, the state must be created and added via
`addStateStore` before they can be connected
- * to the `ValueTransformer`.
- * It's not required to connect global state stores that are added via
`addGlobalStore`;
- * read-only access to global state stores is available by default.
- *
- * @param valueTransformerSupplier a instance of `ValueTransformerSupplier`
that generates a `ValueTransformer`
- * @param named a [[Named]] config used to name the
processor in the topology
- * @param stateStoreNames the names of the state stores used by the
processor
- * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
- @deprecated(since = "3.3", message = "Use
processValues(FixedKeyProcessorSupplier, Named, String*) instead.")
- def transformValues[VR](
- valueTransformerSupplier: ValueTransformerSupplier[V, VR],
- named: Named,
- stateStoreNames: String*
- ): KStream[K, VR] =
- new KStream(inner.transformValues[VR](valueTransformerSupplier, named,
stateStoreNames: _*))
-
- /**
- * Transform the value of each input record into a new value (with possible
new type) of the output record.
- * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
- * record value and computes a new value for it.
- * In order to assign a state, the state must be created and added via
`addStateStore` before they can be connected
- * to the `ValueTransformer`.
- * It's not required to connect global state stores that are added via
`addGlobalStore`;
- * read-only access to global state stores is available by default.
- *
- * @param valueTransformerSupplier a instance of
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
- * @param stateStoreNames the names of the state stores used by the
processor
- * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
- @deprecated(since = "3.3", message = "Use
processValues(FixedKeyProcessorSupplier, String*) instead.")
- def transformValues[VR](
- valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
- stateStoreNames: String*
- ): KStream[K, VR] =
- new KStream(inner.transformValues[VR](valueTransformerSupplier,
stateStoreNames: _*))
-
- /**
- * Transform the value of each input record into a new value (with possible
new type) of the output record.
- * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`)
is applied to each input
- * record value and computes a new value for it.
- * In order to assign a state, the state must be created and added via
`addStateStore` before they can be connected
- * to the `ValueTransformer`.
- * It's not required to connect global state stores that are added via
`addGlobalStore`;
- * read-only access to global state stores is available by default.
- *
- * @param valueTransformerSupplier a instance of
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
- * @param named a [[Named]] config used to name the
processor in the topology
- * @param stateStoreNames the names of the state stores used by the
processor
- * @return a [[KStream]] that contains records with unmodified key and new
values (possibly of different type)
- * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
- */
- @deprecated(since = "3.3", message = "Use
processValues(FixedKeyProcessorSupplier, Named, String*) instead.")
- def transformValues[VR](
- valueTransformerSupplier: ValueTransformerWithKeySupplier[K, V, VR],
- named: Named,
- stateStoreNames: String*
- ): KStream[K, VR] =
- new KStream(inner.transformValues[VR](valueTransformerSupplier, named,
stateStoreNames: _*))
-
/**
* Process all records in this stream, one record at a time, by applying a
`Processor` (provided by the given
* `processorSupplier`).