This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1123a76110b KAFKA-13722: remove internal usage of old ProcessorContext
(#18698)
1123a76110b is described below
commit 1123a76110b5f5a5eb7ac77321655a2a8adc4f83
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jan 29 11:13:57 2025 -0800
KAFKA-13722: remove internal usage of old ProcessorContext (#18698)
Reviewers: Lucas Brutschy <[email protected]>
---
.../org/apache/kafka/streams/kstream/KTable.java | 2 +-
.../internals/KStreamKTableJoinProcessor.java | 6 +-
.../kafka/streams/processor/ProcessorContext.java | 6 +-
.../apache/kafka/streams/processor/Punctuator.java | 11 +-
.../processor/internals/ProcessorContextUtils.java | 12 +--
.../AbstractRocksDBSegmentedBytesStore.java | 5 +-
.../internals/ChangeLoggingKeyValueBytesStore.java | 2 +-
.../internals/RocksDBSegmentedBytesStore.java | 2 +-
.../RocksDBTimestampedSegmentedBytesStore.java | 2 +-
.../streams/kstream/internals/KTableImplTest.java | 120 ++++++++++++---------
10 files changed, 92 insertions(+), 76 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1c8fb3fea39..7082355eb45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -1054,7 +1054,7 @@ public interface KTable<K, V> {
* {@link StreamsConfig} via parameter {@link
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>" is
* an internally generated name, and "-repartition" is a fixed suffix.
*
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
+ * <p>You can retrieve all generated internal topic names via {@link
Topology#describe()}.
*
* <p>
* All data of this {@code KTable} will be redistributed through the
repartitioning topic by writing all update
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index e81877c99e7..637d870ee7e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
private final Optional<Duration> gracePeriod;
private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
- private InternalProcessorContext internalProcessorContext;
+ private InternalProcessorContext<K1, VOut> internalProcessorContext;
private final boolean useBuffer;
private final String storeName;
@@ -78,7 +78,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
valueGetter.init(context);
- internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context);
+ internalProcessorContext = asInternalProcessorContext(context);
if (useBuffer) {
if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
throw new IllegalArgumentException("KTable must be versioned
to use a grace period in a stream table join.");
@@ -90,7 +90,6 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
@Override
public void process(final Record<K1, V1> record) {
- internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context());
updateObservedStreamTime(record.timestamp());
if (maybeDropRecord(record)) {
return;
@@ -123,7 +122,6 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
observedStreamTime = Math.max(observedStreamTime, timestamp);
}
- @SuppressWarnings("unchecked")
private void doJoin(final Record<K1, V1> record) {
final K2 mappedKey = keyMapper.apply(record.key(), record.value());
final V2 value2 = getValue2(record, mappedKey);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 3d057c5ce2b..d65244bcc86 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -41,7 +41,11 @@ import java.util.Map;
* We need to clean this all up
(https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface
* deprecated afterward.
*/
-@SuppressWarnings("deprecation")
+@SuppressWarnings("deprecation") // Not deprecating the old context, since it
is used by Transformers. See KAFKA-10603.
+/*
+ * When we deprecate `ProcessorContext` can also deprecate `To` class,
+ * as it is only used in the `ProcessorContext#forward` method.
+ */
public interface ProcessorContext {
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
index dd533ad7ba2..9b76962f3b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -21,7 +21,8 @@ import org.apache.kafka.streams.processor.api.Record;
import java.time.Duration;
/**
- * A functional interface used as an argument to {@link
ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
+ * A functional interface used as an argument to
+ * {@link
org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration,
PunctuationType, Punctuator)}.
*
* @see Cancellable
*/
@@ -30,14 +31,16 @@ public interface Punctuator {
/**
* Perform the scheduled periodic operation.
*
- * <p> If this method accesses {@link ProcessorContext} or
+ * <p> If this method accesses {@link
org.apache.kafka.streams.processor.api.ProcessorContext} or
* {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record
metadata like topic,
* partition, and offset or {@link
org.apache.kafka.streams.processor.api.RecordMetadata} won't
* be available.
*
- * <p> Furthermore, for any record that is sent downstream via {@link
ProcessorContext#forward(Object, Object)}
+ * <p> Furthermore, for any record that is sent downstream via
+ * {@link
org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}
* or {@link
org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
- * won't be any record metadata. If {@link
ProcessorContext#forward(Object, Object)} is used,
+ * won't be any record metadata. If
+ * {@link
org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is
used,
* it's also not possible to set records headers.
*
* @param timestamp when the operation is being called, depending on
{@link PunctuationType}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 20890088999..0515f8718aa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import java.util.Map;
@@ -64,8 +64,9 @@ public final class ProcessorContextUtils {
}
}
- @SuppressWarnings("unchecked")
- public static <K, V> InternalProcessorContext<K, V>
asInternalProcessorContext(final ProcessorContext context) {
+ public static <K, V> InternalProcessorContext<K, V>
asInternalProcessorContext(
+ final ProcessorContext<K, V> context
+ ) {
if (context instanceof InternalProcessorContext) {
return (InternalProcessorContext<K, V>) context;
} else {
@@ -75,10 +76,9 @@ public final class ProcessorContextUtils {
}
}
- @SuppressWarnings("unchecked")
- public static <K, V> InternalProcessorContext<K, V>
asInternalProcessorContext(final StateStoreContext context) {
+ public static InternalProcessorContext<?, ?>
asInternalProcessorContext(final StateStoreContext context) {
if (context instanceof InternalProcessorContext) {
- return (InternalProcessorContext<K, V>) context;
+ return (InternalProcessorContext<?, ?>) context;
} else {
throw new IllegalArgumentException(
"This component requires internal features of Kafka Streams
and must be disabled for unit tests."
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index f5b4366ae98..bde8d831919 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -53,11 +53,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
private final String name;
private final AbstractSegments<S> segments;
- private final String metricScope;
private final long retentionPeriod;
private final KeySchema keySchema;
- private InternalProcessorContext internalProcessorContext;
+ private InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
@@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends
Segment> implements Se
private volatile boolean open;
AbstractRocksDBSegmentedBytesStore(final String name,
- final String metricScope,
final long retentionPeriod,
final KeySchema keySchema,
final AbstractSegments<S> segments) {
this.name = name;
- this.metricScope = metricScope;
this.retentionPeriod = retentionPeriod;
this.keySchema = keySchema;
this.segments = segments;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 5405ad9a71c..9c1c3f9ae76 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -131,4 +131,4 @@ public class ChangeLoggingKeyValueBytesStore
void log(final Bytes key, final byte[] value, final long timestamp) {
internalContext.logChange(name(), key, value, timestamp,
wrapped().getPosition());
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index e7b7198d1cf..33e787adb53 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends
AbstractRocksDBSegmentedBytesSto
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
- super(name, metricsScope, retention, keySchema, new
KeyValueSegments(name, metricsScope, retention, segmentInterval));
+ super(name, retention, keySchema, new KeyValueSegments(name,
metricsScope, retention, segmentInterval));
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
index 39f493c761b..2f6bcc5c052 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends
AbstractRocksDBSegmen
final long retention,
final long segmentInterval,
final KeySchema keySchema) {
- super(name, metricsScope, retention, keySchema, new
TimestampedSegments(name, metricsScope, retention, segmentInterval));
+ super(name, retention, keySchema, new TimestampedSegments(name,
metricsScope, retention, segmentInterval));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index ebc06819631..a293625dc30 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -233,7 +233,7 @@ public class KTableImplTest {
final ValueMapper<String, String> mapper = value -> value;
final ValueJoiner<String, String, String> joiner = (value1, value2) ->
value1;
final ValueTransformerWithKeySupplier<String, String, String>
valueTransformerWithKeySupplier =
- () -> new ValueTransformerWithKey<String, String, String>() {
+ () -> new ValueTransformerWithKey<>() {
@Override
public void init(final ProcessorContext context) {}
@@ -247,103 +247,103 @@ public class KTableImplTest {
};
assertEquals(
- ((AbstractStream) table1.filter((key, value) -> false)).keySerde(),
+ ((AbstractStream<String, String>) table1.filter((key, value) ->
false)).keySerde(),
consumedInternal.keySerde());
assertEquals(
- ((AbstractStream) table1.filter((key, value) ->
false)).valueSerde(),
+ ((AbstractStream<String, String>) table1.filter((key, value) ->
false)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
- ((AbstractStream) table1.filter((key, value) -> false,
Materialized.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>) table1.filter((key, value) ->
false, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.filter((key, value) -> false,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ ((AbstractStream<String, String>) table1.filter((key, value) ->
false, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.filterNot((key, value) ->
false)).keySerde(),
+ ((AbstractStream<String, String>) table1.filterNot((key, value) ->
false)).keySerde(),
consumedInternal.keySerde());
assertEquals(
- ((AbstractStream) table1.filterNot((key, value) ->
false)).valueSerde(),
+ ((AbstractStream<String, String>) table1.filterNot((key, value) ->
false)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
- ((AbstractStream) table1.filterNot((key, value) -> false,
Materialized.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>) table1.filterNot((key, value) ->
false, Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.filterNot((key, value) -> false,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ ((AbstractStream<String, String>) table1.filterNot((key, value) ->
false, Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.mapValues(mapper)).keySerde(),
+ ((AbstractStream<String, String>)
table1.mapValues(mapper)).keySerde(),
consumedInternal.keySerde());
- assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
+ assertNull(((AbstractStream<String, String>)
table1.mapValues(mapper)).valueSerde());
assertEquals(
- ((AbstractStream) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ ((AbstractStream<String, String>) table1.mapValues(mapper,
Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.toStream()).keySerde(),
+ ((AbstractStream<String, String>) table1.toStream()).keySerde(),
consumedInternal.keySerde());
assertEquals(
- ((AbstractStream) table1.toStream()).valueSerde(),
+ ((AbstractStream<String, String>) table1.toStream()).valueSerde(),
consumedInternal.valueSerde());
- assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
+ assertNull(((AbstractStream<String, String>)
table1.toStream(selector)).keySerde());
assertEquals(
- ((AbstractStream) table1.toStream(selector)).valueSerde(),
+ ((AbstractStream<String, String>)
table1.toStream(selector)).valueSerde(),
consumedInternal.valueSerde());
assertEquals(
- ((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
+ ((AbstractStream<String, String>)
table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
consumedInternal.keySerde());
- assertNull(((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
+ assertNull(((AbstractStream<String, String>)
table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
assertEquals(
- ((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
- assertEquals(((AbstractStream)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ assertEquals(((AbstractStream<String, String>)
table1.transformValues(valueTransformerWithKeySupplier,
Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
- assertNull(((AbstractStream)
table1.groupBy(KeyValue::new)).keySerde());
- assertNull(((AbstractStream)
table1.groupBy(KeyValue::new)).valueSerde());
+ assertNull(((AbstractStream<String, String>)
table1.groupBy(KeyValue::new)).keySerde());
+ assertNull(((AbstractStream<String, String>)
table1.groupBy(KeyValue::new)).valueSerde());
assertEquals(
- ((AbstractStream) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).valueSerde(),
+ ((AbstractStream<String, String>) table1.groupBy(KeyValue::new,
Grouped.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.join(table1, joiner)).keySerde(),
+ ((AbstractStream<String, String>) table1.join(table1,
joiner)).keySerde(),
consumedInternal.keySerde());
- assertNull(((AbstractStream) table1.join(table1,
joiner)).valueSerde());
+ assertNull(((AbstractStream<String, String>) table1.join(table1,
joiner)).valueSerde());
assertEquals(
- ((AbstractStream) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ ((AbstractStream<String, String>) table1.join(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(),
+ ((AbstractStream<String, String>) table1.leftJoin(table1,
joiner)).keySerde(),
consumedInternal.keySerde());
- assertNull(((AbstractStream) table1.leftJoin(table1,
joiner)).valueSerde());
+ assertNull(((AbstractStream<String, String>) table1.leftJoin(table1,
joiner)).valueSerde());
assertEquals(
- ((AbstractStream) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ ((AbstractStream<String, String>) table1.leftJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(),
+ ((AbstractStream<String, String>) table1.outerJoin(table1,
joiner)).keySerde(),
consumedInternal.keySerde());
- assertNull(((AbstractStream) table1.outerJoin(table1,
joiner)).valueSerde());
+ assertNull(((AbstractStream<String, String>) table1.outerJoin(table1,
joiner)).valueSerde());
assertEquals(
- ((AbstractStream) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
+ ((AbstractStream<String, String>) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).keySerde(),
mySerde);
assertEquals(
- ((AbstractStream) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
+ ((AbstractStream<String, String>) table1.outerJoin(table1, joiner,
Materialized.with(mySerde, mySerde))).valueSerde(),
mySerde);
}
@@ -462,25 +462,25 @@ public class KTableImplTest {
assertTopologyContainsProcessor(topology,
"KSTREAM-SINK-0000000007");
assertTopologyContainsProcessor(topology,
"KSTREAM-SOURCE-0000000008");
- final Field valSerializerField = ((SinkNode)
driver.getProcessor("KSTREAM-SINK-0000000003"))
+ final Field valSerializerField = ((SinkNode<?, ?>)
driver.getProcessor("KSTREAM-SINK-0000000003"))
.getClass()
.getDeclaredField("valSerializer");
- final Field valDeserializerField = ((SourceNode)
driver.getProcessor("KSTREAM-SOURCE-0000000004"))
+ final Field valDeserializerField = ((SourceNode<?, ?>)
driver.getProcessor("KSTREAM-SOURCE-0000000004"))
.getClass()
.getDeclaredField("valDeserializer");
valSerializerField.setAccessible(true);
valDeserializerField.setAccessible(true);
- assertNotNull(((ChangedSerializer)
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
- assertNotNull(((ChangedDeserializer)
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
- assertNotNull(((ChangedSerializer)
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
- assertNotNull(((ChangedDeserializer)
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
+ assertNotNull(((ChangedSerializer<?>)
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
+ assertNotNull(((ChangedDeserializer<?>)
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
+ assertNotNull(((ChangedSerializer<?>)
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
+ assertNotNull(((ChangedDeserializer<?>)
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
}
}
@Test
public void shouldNotAllowNullSelectorOnToStream() {
- assertThrows(NullPointerException.class, () ->
table.toStream((KeyValueMapper) null));
+ assertThrows(NullPointerException.class, () ->
table.toStream((KeyValueMapper<String, String, ?>) null));
}
@Test
@@ -495,12 +495,12 @@ public class KTableImplTest {
@Test
public void shouldNotAllowNullMapperOnMapValues() {
- assertThrows(NullPointerException.class, () ->
table.mapValues((ValueMapper) null));
+ assertThrows(NullPointerException.class, () ->
table.mapValues((ValueMapper<String, ?>) null));
}
@Test
public void shouldNotAllowNullMapperOnMapValueWithKey() {
- assertThrows(NullPointerException.class, () ->
table.mapValues((ValueMapperWithKey) null));
+ assertThrows(NullPointerException.class, () ->
table.mapValues((ValueMapperWithKey<String, String, ?>) null));
}
@Test
@@ -545,27 +545,42 @@ public class KTableImplTest {
@Test
public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
- assertThrows(NullPointerException.class, () -> table.filter((key,
value) -> false, (Materialized) null));
+ assertThrows(
+ NullPointerException.class,
+ () -> table.filter((key, value) -> false, (Materialized<String,
String, KeyValueStore<Bytes, byte[]>>) null)
+ );
}
@Test
public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
- assertThrows(NullPointerException.class, () -> table.filterNot((key,
value) -> false, (Materialized) null));
+ assertThrows(
+ NullPointerException.class,
+ () -> table.filterNot((key, value) -> false, (Materialized<String,
String, KeyValueStore<Bytes, byte[]>>) null)
+ );
}
@Test
public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
- assertThrows(NullPointerException.class, () -> table.join(table,
MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
+ assertThrows(
+ NullPointerException.class,
+ () -> table.join(table, MockValueJoiner.TOSTRING_JOINER,
(Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
+ );
}
@Test
public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
- assertThrows(NullPointerException.class, () -> table.leftJoin(table,
MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
+ assertThrows(
+ NullPointerException.class,
+ () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER,
(Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
+ );
}
@Test
public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
- assertThrows(NullPointerException.class, () -> table.outerJoin(table,
MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
+ assertThrows(
+ NullPointerException.class,
+ () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER,
(Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
+ );
}
@Test
@@ -573,12 +588,11 @@ public class KTableImplTest {
assertThrows(NullPointerException.class, () ->
table.transformValues(null));
}
- @SuppressWarnings("unchecked")
@Test
public void
shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
final ValueTransformerWithKeySupplier<String, String, ?>
valueTransformerSupplier =
mock(ValueTransformerWithKeySupplier.class);
- assertThrows(NullPointerException.class, () ->
table.transformValues(valueTransformerSupplier, (Materialized) null));
+ assertThrows(NullPointerException.class, () ->
table.transformValues(valueTransformerSupplier, (Materialized<String, Object,
KeyValueStore<Bytes, byte[]>>) null));
}
@Test