This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1123a76110b KAFKA-13722: remove internal usage of old ProcessorContext 
(#18698)
1123a76110b is described below

commit 1123a76110b5f5a5eb7ac77321655a2a8adc4f83
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Jan 29 11:13:57 2025 -0800

    KAFKA-13722: remove internal usage of old ProcessorContext (#18698)
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../org/apache/kafka/streams/kstream/KTable.java   |   2 +-
 .../internals/KStreamKTableJoinProcessor.java      |   6 +-
 .../kafka/streams/processor/ProcessorContext.java  |   6 +-
 .../apache/kafka/streams/processor/Punctuator.java |  11 +-
 .../processor/internals/ProcessorContextUtils.java |  12 +--
 .../AbstractRocksDBSegmentedBytesStore.java        |   5 +-
 .../internals/ChangeLoggingKeyValueBytesStore.java |   2 +-
 .../internals/RocksDBSegmentedBytesStore.java      |   2 +-
 .../RocksDBTimestampedSegmentedBytesStore.java     |   2 +-
 .../streams/kstream/internals/KTableImplTest.java  | 120 ++++++++++++---------
 10 files changed, 92 insertions(+), 76 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 1c8fb3fea39..7082355eb45 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -1054,7 +1054,7 @@ public interface KTable<K, V> {
      * {@link  StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "&lt;name&gt;" is
      * an internally generated name, and "-repartition" is a fixed suffix.
      *
-     * You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
+     * <p>You can retrieve all generated internal topic names via {@link 
Topology#describe()}.
      *
      * <p>
      * All data of this {@code KTable} will be redistributed through the 
repartitioning topic by writing all update
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index e81877c99e7..637d870ee7e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -53,7 +53,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
     private final Optional<Duration> gracePeriod;
     private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
     protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
-    private InternalProcessorContext internalProcessorContext;
+    private InternalProcessorContext<K1, VOut> internalProcessorContext;
     private final boolean useBuffer;
     private final String storeName;
 
@@ -78,7 +78,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
-        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        internalProcessorContext = asInternalProcessorContext(context);
         if (useBuffer) {
             if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
                 throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
@@ -90,7 +90,6 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
 
     @Override
     public void process(final Record<K1, V1> record) {
-        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
         updateObservedStreamTime(record.timestamp());
         if (maybeDropRecord(record)) {
             return;
@@ -123,7 +122,6 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
         observedStreamTime = Math.max(observedStreamTime, timestamp);
     }
 
-    @SuppressWarnings("unchecked")
     private void doJoin(final Record<K1, V1> record) {
         final K2 mappedKey = keyMapper.apply(record.key(), record.value());
         final V2 value2 = getValue2(record, mappedKey);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 3d057c5ce2b..d65244bcc86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -41,7 +41,11 @@ import java.util.Map;
  * We need to clean this all up 
(https://issues.apache.org/jira/browse/KAFKA-17131) and mark the interface
  * deprecated afterward.
  */
-@SuppressWarnings("deprecation")
+@SuppressWarnings("deprecation") // Not deprecating the old context, since it 
is used by Transformers. See KAFKA-10603.
+/*
+ * When we deprecate `ProcessorContext` can also deprecate `To` class,
+ * as it is only used in the `ProcessorContext#forward` method.
+ */
 public interface ProcessorContext {
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
index dd533ad7ba2..9b76962f3b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Punctuator.java
@@ -21,7 +21,8 @@ import org.apache.kafka.streams.processor.api.Record;
 import java.time.Duration;
 
 /**
- * A functional interface used as an argument to {@link 
ProcessorContext#schedule(Duration, PunctuationType, Punctuator)}.
+ * A functional interface used as an argument to
+ * {@link 
org.apache.kafka.streams.processor.api.ProcessorContext#schedule(Duration, 
PunctuationType, Punctuator)}.
  *
  * @see Cancellable
  */
@@ -30,14 +31,16 @@ public interface Punctuator {
     /**
      * Perform the scheduled periodic operation.
      *
-     * <p> If this method accesses {@link ProcessorContext} or
+     * <p> If this method accesses {@link 
org.apache.kafka.streams.processor.api.ProcessorContext} or
      * {@link org.apache.kafka.streams.processor.api.ProcessorContext}, record 
metadata like topic,
      * partition, and offset or {@link 
org.apache.kafka.streams.processor.api.RecordMetadata} won't
      * be available.
      *
-     * <p> Furthermore, for any record that is sent downstream via {@link 
ProcessorContext#forward(Object, Object)}
+     * <p> Furthermore, for any record that is sent downstream via
+     * {@link 
org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}
      * or {@link 
org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)}, there
-     * won't be any record metadata. If {@link 
ProcessorContext#forward(Object, Object)} is used,
+     * won't be any record metadata. If
+     * {@link 
org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record)} is 
used,
      * it's also not possible to set records headers.
      *
      * @param timestamp when the operation is being called, depending on 
{@link PunctuationType}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index 20890088999..0515f8718aa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.Map;
@@ -64,8 +64,9 @@ public final class ProcessorContextUtils {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public static <K, V> InternalProcessorContext<K, V> 
asInternalProcessorContext(final ProcessorContext context) {
+    public static <K, V> InternalProcessorContext<K, V> 
asInternalProcessorContext(
+        final ProcessorContext<K, V> context
+    ) {
         if (context instanceof InternalProcessorContext) {
             return (InternalProcessorContext<K, V>) context;
         } else {
@@ -75,10 +76,9 @@ public final class ProcessorContextUtils {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public static <K, V> InternalProcessorContext<K, V> 
asInternalProcessorContext(final StateStoreContext context) {
+    public static InternalProcessorContext<?, ?> 
asInternalProcessorContext(final StateStoreContext context) {
         if (context instanceof InternalProcessorContext) {
-            return (InternalProcessorContext<K, V>) context;
+            return (InternalProcessorContext<?, ?>) context;
         } else {
             throw new IllegalArgumentException(
                 "This component requires internal features of Kafka Streams 
and must be disabled for unit tests."
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index f5b4366ae98..bde8d831919 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -53,11 +53,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
 
     private final String name;
     private final AbstractSegments<S> segments;
-    private final String metricScope;
     private final long retentionPeriod;
     private final KeySchema keySchema;
 
-    private InternalProcessorContext internalProcessorContext;
+    private InternalProcessorContext<?, ?> internalProcessorContext;
     private Sensor expiredRecordSensor;
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
     private boolean consistencyEnabled = false;
@@ -66,12 +65,10 @@ public class AbstractRocksDBSegmentedBytesStore<S extends 
Segment> implements Se
     private volatile boolean open;
 
     AbstractRocksDBSegmentedBytesStore(final String name,
-                                       final String metricScope,
                                        final long retentionPeriod,
                                        final KeySchema keySchema,
                                        final AbstractSegments<S> segments) {
         this.name = name;
-        this.metricScope = metricScope;
         this.retentionPeriod = retentionPeriod;
         this.keySchema = keySchema;
         this.segments = segments;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 5405ad9a71c..9c1c3f9ae76 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -131,4 +131,4 @@ public class ChangeLoggingKeyValueBytesStore
     void log(final Bytes key, final byte[] value, final long timestamp) {
         internalContext.logChange(name(), key, value, timestamp, 
wrapped().getPosition());
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index e7b7198d1cf..33e787adb53 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBSegmentedBytesStore extends 
AbstractRocksDBSegmentedBytesSto
                                final long retention,
                                final long segmentInterval,
                                final KeySchema keySchema) {
-        super(name, metricsScope, retention, keySchema, new 
KeyValueSegments(name, metricsScope, retention, segmentInterval));
+        super(name, retention, keySchema, new KeyValueSegments(name, 
metricsScope, retention, segmentInterval));
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
index 39f493c761b..2f6bcc5c052 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStore.java
@@ -23,6 +23,6 @@ public class RocksDBTimestampedSegmentedBytesStore extends 
AbstractRocksDBSegmen
                                           final long retention,
                                           final long segmentInterval,
                                           final KeySchema keySchema) {
-        super(name, metricsScope, retention, keySchema, new 
TimestampedSegments(name, metricsScope, retention, segmentInterval));
+        super(name, retention, keySchema, new TimestampedSegments(name, 
metricsScope, retention, segmentInterval));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index ebc06819631..a293625dc30 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -233,7 +233,7 @@ public class KTableImplTest {
         final ValueMapper<String, String> mapper = value -> value;
         final ValueJoiner<String, String, String> joiner = (value1, value2) -> 
value1;
         final ValueTransformerWithKeySupplier<String, String, String> 
valueTransformerWithKeySupplier =
-            () -> new ValueTransformerWithKey<String, String, String>() {
+            () -> new ValueTransformerWithKey<>() {
                 @Override
                 public void init(final ProcessorContext context) {}
 
@@ -247,103 +247,103 @@ public class KTableImplTest {
             };
 
         assertEquals(
-            ((AbstractStream) table1.filter((key, value) -> false)).keySerde(),
+            ((AbstractStream<String, String>) table1.filter((key, value) -> 
false)).keySerde(),
             consumedInternal.keySerde());
         assertEquals(
-            ((AbstractStream) table1.filter((key, value) -> 
false)).valueSerde(),
+            ((AbstractStream<String, String>) table1.filter((key, value) -> 
false)).valueSerde(),
             consumedInternal.valueSerde());
         assertEquals(
-            ((AbstractStream) table1.filter((key, value) -> false, 
Materialized.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) table1.filter((key, value) -> 
false, Materialized.with(mySerde, mySerde))).keySerde(),
             mySerde);
         assertEquals(
-            ((AbstractStream) table1.filter((key, value) -> false, 
Materialized.with(mySerde, mySerde))).valueSerde(),
+            ((AbstractStream<String, String>) table1.filter((key, value) -> 
false, Materialized.with(mySerde, mySerde))).valueSerde(),
             mySerde);
 
         assertEquals(
-            ((AbstractStream) table1.filterNot((key, value) -> 
false)).keySerde(),
+            ((AbstractStream<String, String>) table1.filterNot((key, value) -> 
false)).keySerde(),
             consumedInternal.keySerde());
         assertEquals(
-            ((AbstractStream) table1.filterNot((key, value) -> 
false)).valueSerde(),
+            ((AbstractStream<String, String>) table1.filterNot((key, value) -> 
false)).valueSerde(),
             consumedInternal.valueSerde());
         assertEquals(
-            ((AbstractStream) table1.filterNot((key, value) -> false, 
Materialized.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) table1.filterNot((key, value) -> 
false, Materialized.with(mySerde, mySerde))).keySerde(),
             mySerde);
         assertEquals(
-            ((AbstractStream) table1.filterNot((key, value) -> false, 
Materialized.with(mySerde, mySerde))).valueSerde(),
+            ((AbstractStream<String, String>) table1.filterNot((key, value) -> 
false, Materialized.with(mySerde, mySerde))).valueSerde(),
             mySerde);
 
         assertEquals(
-            ((AbstractStream) table1.mapValues(mapper)).keySerde(),
+            ((AbstractStream<String, String>) 
table1.mapValues(mapper)).keySerde(),
             consumedInternal.keySerde());
-        assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde());
+        assertNull(((AbstractStream<String, String>) 
table1.mapValues(mapper)).valueSerde());
         assertEquals(
-            ((AbstractStream) table1.mapValues(mapper, 
Materialized.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) table1.mapValues(mapper, 
Materialized.with(mySerde, mySerde))).keySerde(),
             mySerde);
         assertEquals(
-            ((AbstractStream) table1.mapValues(mapper, 
Materialized.with(mySerde, mySerde))).valueSerde(),
+            ((AbstractStream<String, String>) table1.mapValues(mapper, 
Materialized.with(mySerde, mySerde))).valueSerde(),
             mySerde);
 
         assertEquals(
-            ((AbstractStream) table1.toStream()).keySerde(),
+            ((AbstractStream<String, String>) table1.toStream()).keySerde(),
             consumedInternal.keySerde());
         assertEquals(
-            ((AbstractStream) table1.toStream()).valueSerde(),
+            ((AbstractStream<String, String>) table1.toStream()).valueSerde(),
             consumedInternal.valueSerde());
-        assertNull(((AbstractStream) table1.toStream(selector)).keySerde());
+        assertNull(((AbstractStream<String, String>) 
table1.toStream(selector)).keySerde());
         assertEquals(
-            ((AbstractStream) table1.toStream(selector)).valueSerde(),
+            ((AbstractStream<String, String>) 
table1.toStream(selector)).valueSerde(),
             consumedInternal.valueSerde());
 
         assertEquals(
-            ((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
+            ((AbstractStream<String, String>) 
table1.transformValues(valueTransformerWithKeySupplier)).keySerde(),
             consumedInternal.keySerde());
-        assertNull(((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
+        assertNull(((AbstractStream<String, String>) 
table1.transformValues(valueTransformerWithKeySupplier)).valueSerde());
         assertEquals(
-            ((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier, 
Materialized.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) 
table1.transformValues(valueTransformerWithKeySupplier, 
Materialized.with(mySerde, mySerde))).keySerde(),
             mySerde);
-        assertEquals(((AbstractStream) 
table1.transformValues(valueTransformerWithKeySupplier, 
Materialized.with(mySerde, mySerde))).valueSerde(),
+        assertEquals(((AbstractStream<String, String>) 
table1.transformValues(valueTransformerWithKeySupplier, 
Materialized.with(mySerde, mySerde))).valueSerde(),
             mySerde);
 
-        assertNull(((AbstractStream) 
table1.groupBy(KeyValue::new)).keySerde());
-        assertNull(((AbstractStream) 
table1.groupBy(KeyValue::new)).valueSerde());
+        assertNull(((AbstractStream<String, String>) 
table1.groupBy(KeyValue::new)).keySerde());
+        assertNull(((AbstractStream<String, String>) 
table1.groupBy(KeyValue::new)).valueSerde());
         assertEquals(
-            ((AbstractStream) table1.groupBy(KeyValue::new, 
Grouped.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) table1.groupBy(KeyValue::new, 
Grouped.with(mySerde, mySerde))).keySerde(),
             mySerde);
         assertEquals(
-            ((AbstractStream) table1.groupBy(KeyValue::new, 
Grouped.with(mySerde, mySerde))).valueSerde(),
+            ((AbstractStream<String, String>) table1.groupBy(KeyValue::new, 
Grouped.with(mySerde, mySerde))).valueSerde(),
             mySerde);
 
         assertEquals(
-            ((AbstractStream) table1.join(table1, joiner)).keySerde(),
+            ((AbstractStream<String, String>) table1.join(table1, 
joiner)).keySerde(),
             consumedInternal.keySerde());
-        assertNull(((AbstractStream) table1.join(table1, 
joiner)).valueSerde());
+        assertNull(((AbstractStream<String, String>) table1.join(table1, 
joiner)).valueSerde());
         assertEquals(
-            ((AbstractStream) table1.join(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) table1.join(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(),
             mySerde);
         assertEquals(
-            ((AbstractStream) table1.join(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(),
+            ((AbstractStream<String, String>) table1.join(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(),
             mySerde);
 
         assertEquals(
-            ((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(),
+            ((AbstractStream<String, String>) table1.leftJoin(table1, 
joiner)).keySerde(),
             consumedInternal.keySerde());
-        assertNull(((AbstractStream) table1.leftJoin(table1, 
joiner)).valueSerde());
+        assertNull(((AbstractStream<String, String>) table1.leftJoin(table1, 
joiner)).valueSerde());
         assertEquals(
-            ((AbstractStream) table1.leftJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) table1.leftJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(),
             mySerde);
         assertEquals(
-            ((AbstractStream) table1.leftJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(),
+            ((AbstractStream<String, String>) table1.leftJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(),
             mySerde);
 
         assertEquals(
-            ((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(),
+            ((AbstractStream<String, String>) table1.outerJoin(table1, 
joiner)).keySerde(),
             consumedInternal.keySerde());
-        assertNull(((AbstractStream) table1.outerJoin(table1, 
joiner)).valueSerde());
+        assertNull(((AbstractStream<String, String>) table1.outerJoin(table1, 
joiner)).valueSerde());
         assertEquals(
-            ((AbstractStream) table1.outerJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(),
+            ((AbstractStream<String, String>) table1.outerJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).keySerde(),
             mySerde);
         assertEquals(
-            ((AbstractStream) table1.outerJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(),
+            ((AbstractStream<String, String>) table1.outerJoin(table1, joiner, 
Materialized.with(mySerde, mySerde))).valueSerde(),
             mySerde);
     }
 
@@ -462,25 +462,25 @@ public class KTableImplTest {
             assertTopologyContainsProcessor(topology, 
"KSTREAM-SINK-0000000007");
             assertTopologyContainsProcessor(topology, 
"KSTREAM-SOURCE-0000000008");
 
-            final Field valSerializerField = ((SinkNode) 
driver.getProcessor("KSTREAM-SINK-0000000003"))
+            final Field valSerializerField = ((SinkNode<?, ?>) 
driver.getProcessor("KSTREAM-SINK-0000000003"))
                 .getClass()
                 .getDeclaredField("valSerializer");
-            final Field valDeserializerField = ((SourceNode) 
driver.getProcessor("KSTREAM-SOURCE-0000000004"))
+            final Field valDeserializerField = ((SourceNode<?, ?>) 
driver.getProcessor("KSTREAM-SOURCE-0000000004"))
                 .getClass()
                 .getDeclaredField("valDeserializer");
             valSerializerField.setAccessible(true);
             valDeserializerField.setAccessible(true);
 
-            assertNotNull(((ChangedSerializer) 
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
-            assertNotNull(((ChangedDeserializer) 
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
-            assertNotNull(((ChangedSerializer) 
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
-            assertNotNull(((ChangedDeserializer) 
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
+            assertNotNull(((ChangedSerializer<?>) 
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
+            assertNotNull(((ChangedDeserializer<?>) 
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
+            assertNotNull(((ChangedSerializer<?>) 
valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
+            assertNotNull(((ChangedDeserializer<?>) 
valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
         }
     }
 
     @Test
     public void shouldNotAllowNullSelectorOnToStream() {
-        assertThrows(NullPointerException.class, () -> 
table.toStream((KeyValueMapper) null));
+        assertThrows(NullPointerException.class, () -> 
table.toStream((KeyValueMapper<String, String, ?>) null));
     }
 
     @Test
@@ -495,12 +495,12 @@ public class KTableImplTest {
 
     @Test
     public void shouldNotAllowNullMapperOnMapValues() {
-        assertThrows(NullPointerException.class, () -> 
table.mapValues((ValueMapper) null));
+        assertThrows(NullPointerException.class, () -> 
table.mapValues((ValueMapper<String, ?>) null));
     }
 
     @Test
     public void shouldNotAllowNullMapperOnMapValueWithKey() {
-        assertThrows(NullPointerException.class, () -> 
table.mapValues((ValueMapperWithKey) null));
+        assertThrows(NullPointerException.class, () -> 
table.mapValues((ValueMapperWithKey<String, String, ?>) null));
     }
 
     @Test
@@ -545,27 +545,42 @@ public class KTableImplTest {
 
     @Test
     public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() {
-        assertThrows(NullPointerException.class, () -> table.filter((key, 
value) -> false, (Materialized) null));
+        assertThrows(
+            NullPointerException.class,
+            () -> table.filter((key, value) -> false, (Materialized<String, 
String, KeyValueStore<Bytes, byte[]>>) null)
+        );
     }
 
     @Test
     public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() {
-        assertThrows(NullPointerException.class, () -> table.filterNot((key, 
value) -> false, (Materialized) null));
+        assertThrows(
+            NullPointerException.class,
+            () -> table.filterNot((key, value) -> false, (Materialized<String, 
String, KeyValueStore<Bytes, byte[]>>) null)
+        );
     }
 
     @Test
     public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() {
-        assertThrows(NullPointerException.class, () -> table.join(table, 
MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
+        assertThrows(
+            NullPointerException.class,
+            () -> table.join(table, MockValueJoiner.TOSTRING_JOINER, 
(Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
+        );
     }
 
     @Test
     public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() {
-        assertThrows(NullPointerException.class, () -> table.leftJoin(table, 
MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
+        assertThrows(
+            NullPointerException.class,
+            () -> table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, 
(Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
+        );
     }
 
     @Test
     public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
-        assertThrows(NullPointerException.class, () -> table.outerJoin(table, 
MockValueJoiner.TOSTRING_JOINER, (Materialized) null));
+        assertThrows(
+            NullPointerException.class,
+            () -> table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, 
(Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null)
+        );
     }
 
     @Test
@@ -573,12 +588,11 @@ public class KTableImplTest {
         assertThrows(NullPointerException.class, () -> 
table.transformValues(null));
     }
 
-    @SuppressWarnings("unchecked")
     @Test
     public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
         final ValueTransformerWithKeySupplier<String, String, ?> 
valueTransformerSupplier =
             mock(ValueTransformerWithKeySupplier.class);
-        assertThrows(NullPointerException.class, () -> 
table.transformValues(valueTransformerSupplier, (Materialized) null));
+        assertThrows(NullPointerException.class, () -> 
table.transformValues(valueTransformerSupplier, (Materialized<String, Object, 
KeyValueStore<Bytes, byte[]>>) null));
     }
 
     @Test

Reply via email to