This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 990c8c750c5 MINOR: remove old procesor API 
MockInternalProcessorContext (#18103)
990c8c750c5 is described below

commit 990c8c750c51c0947a2242313066a26d1262ac9d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 11 15:09:13 2024 -0800

    MINOR: remove old procesor API MockInternalProcessorContext (#18103)
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../internals/TimeOrderedKeyValueBufferTest.java   |  86 ++++----
 .../metrics/RocksDBBlockCacheMetricsTest.java      |  27 ++-
 .../test/MockInternalNewProcessorContext.java      |  25 ++-
 .../kafka/test/MockInternalProcessorContext.java   | 224 ---------------------
 4 files changed, 83 insertions(+), 279 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 1d81dffdd78..c8b9aaedf50 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -29,14 +29,13 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import 
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
 import org.apache.kafka.test.TestUtils;
 
@@ -96,21 +95,21 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         this.bufferSupplier = bufferSupplier;
     }
 
-    private static MockInternalProcessorContext makeContext() {
+    private static MockInternalNewProcessorContext<?, ?> makeContext() {
         final Properties properties = new Properties();
         properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
         properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
 
         final TaskId taskId = new TaskId(0, 0);
 
-        final MockInternalProcessorContext context = new 
MockInternalProcessorContext(properties, taskId, TestUtils.tempDirectory());
+        final MockInternalNewProcessorContext<?, ?> context = new 
MockInternalNewProcessorContext<>(properties, taskId, 
TestUtils.tempDirectory());
         context.setRecordCollector(new MockRecordCollector());
 
         return context;
     }
 
 
-    private static void cleanup(final MockInternalProcessorContext context, 
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer) {
+    private static void cleanup(final MockInternalNewProcessorContext<?, ?> 
context, final TimeOrderedKeyValueBuffer<String, String, Change<String>> 
buffer) {
         try {
             buffer.close();
             Utils.delete(context.stateDir());
@@ -124,8 +123,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldInit(final String testName, final Function<String, B> 
bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         cleanup(context, buffer);
     }
 
@@ -134,8 +133,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldAcceptData(final String testName, final Function<String, 
B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
         cleanup(context, buffer);
     }
@@ -145,8 +144,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldRejectNullValues(final String testName, final 
Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         try {
             buffer.put(0, new Record<>("asdf", null, 0L), getContext(0));
             fail("expected an exception");
@@ -161,8 +160,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldRemoveData(final String testName, final Function<String, 
B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
         assertThat(buffer.numRecords(), is(1));
         buffer.evictWhile(() -> true, kv -> { });
@@ -175,8 +174,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldRespectEvictionPredicate(final String testName, final 
Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
         putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
         assertThat(buffer.numRecords(), is(2));
@@ -194,8 +193,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldTrackCount(final String testName, final Function<String, 
B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "oin");
         assertThat(buffer.numRecords(), is(1));
         putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
@@ -210,8 +209,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldTrackSize(final String testName, final Function<String, 
B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
         assertThat(buffer.bufferSize(), is(43L));
         putRecord(buffer, context, 1L, 0L, "asdf", "3l");
@@ -226,8 +225,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldTrackMinTimestamp(final String testName, final 
Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
         assertThat(buffer.minTimestamp(), is(1L));
         putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
@@ -240,8 +239,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp(final 
String testName, final Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
         assertThat(buffer.numRecords(), is(1));
@@ -288,8 +287,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldReturnUndefinedOnPriorValueForNotBufferedKey(final 
String testName, final Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         assertThat(buffer.priorValueForBuffered("ASDF"), 
is(Maybe.undefined()));
     }
@@ -299,8 +298,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldReturnPriorValueForBufferedKey(final String testName, 
final Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         final ProcessorRecordContext recordContext = getContext(0L);
         context.setRecordContext(recordContext);
@@ -315,8 +314,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldFlush(final String testName, final Function<String, B> 
bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
         putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
         putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
         putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
@@ -388,8 +387,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldRestoreOldUnversionedFormat(final String testName, final 
Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) 
context.stateRestoreCallback(testName);
@@ -509,8 +508,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV1Format(final String testName, final 
Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) 
context.stateRestoreCallback(testName);
@@ -633,8 +632,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV2Format(final String testName, final 
Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) 
context.stateRestoreCallback(testName);
@@ -759,8 +758,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         // V2 header, so we need to be sure to handle this case as well.
         // Note the data is the same as the V3 test.
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) 
context.stateRestoreCallback(testName);
@@ -882,8 +881,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldRestoreV3Format(final String testName, final 
Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) 
context.stateRestoreCallback(testName);
@@ -1005,8 +1004,8 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     public void shouldNotRestoreUnrecognizedVersionRecord(final String 
testName, final Function<String, B> bufferSupplier) {
         setup(testName, bufferSupplier);
         final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer 
= bufferSupplier.apply(testName);
-        final MockInternalProcessorContext context = makeContext();
-        buffer.init((StateStoreContext) context, buffer);
+        final MockInternalNewProcessorContext<?, ?> context = makeContext();
+        buffer.init(context, buffer);
 
         final RecordBatchingStateRestoreCallback stateRestoreCallback =
             (RecordBatchingStateRestoreCallback) 
context.stateRestoreCallback(testName);
@@ -1039,7 +1038,7 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
     }
 
     private static void putRecord(final TimeOrderedKeyValueBuffer<String, 
String, Change<String>> buffer,
-                                  final MockInternalProcessorContext context,
+                                  final MockInternalNewProcessorContext<?, ?> 
context,
                                   final long streamTime,
                                   final long recordTimestamp,
                                   final String key,
@@ -1049,11 +1048,12 @@ public class TimeOrderedKeyValueBufferTest<B extends 
TimeOrderedKeyValueBuffer<S
         buffer.put(streamTime, new Record<>(key, new Change<>(value, null), 
0L), recordContext);
     }
 
+    @SuppressWarnings("resource")
     private static BufferValue getBufferValue(final String value, final long 
timestamp) {
         return new BufferValue(
             null,
             null,
-            Serdes.String().serializer().serialize(null, value),
+            new StringSerializer().serialize(null, value),
             getContext(timestamp)
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
index 8d189144504..57259cb31c4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
@@ -26,12 +26,13 @@ import 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import 
org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
 import org.apache.kafka.streams.state.internals.RocksDBStore;
 import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.rocksdb.Cache;
 
 import java.io.File;
 import java.io.IOException;
@@ -57,8 +58,8 @@ public class RocksDBBlockCacheMetricsTest {
     public static Stream<Arguments> stores() {
         final File stateDir = TestUtils.tempDirectory("state");
         return Stream.of(
-            Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new 
MockInternalProcessorContext(new Properties(), TASK_ID, stateDir)),
-            Arguments.of(new RocksDBTimestampedStore(STORE_NAME, 
METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID, 
stateDir))
+            Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new 
MockInternalNewProcessorContext<>(new Properties(), TASK_ID, stateDir)),
+            Arguments.of(new RocksDBTimestampedStore(STORE_NAME, 
METRICS_SCOPE), new MockInternalNewProcessorContext<>(new Properties(), 
TASK_ID, stateDir))
         );
     }
 
@@ -79,8 +80,11 @@ public class RocksDBBlockCacheMetricsTest {
     @ParameterizedTest
     @MethodSource("stores")
     public void shouldRecordCorrectBlockCacheCapacity(final RocksDBStore 
store, final StateStoreContext ctx) {
-        withStore(store, ctx, () ->
-                assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L)));
+        withStore(
+            store,
+            ctx,
+            () -> assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L))
+        );
     }
 
     @ParameterizedTest
@@ -88,8 +92,10 @@ public class RocksDBBlockCacheMetricsTest {
     public void shouldRecordCorrectBlockCacheUsage(final RocksDBStore store, 
final StateStoreContext ctx) {
         withStore(store, ctx, () -> {
             final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = 
(BlockBasedTableConfigWithAccessibleCache) 
store.getOptions().tableFormatConfig();
-            final long usage = tableFormatConfig.blockCache().getUsage();
-            assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+            try (final Cache blockCache = tableFormatConfig.blockCache()) {
+                final long usage = blockCache.getUsage();
+                assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+            }
         });
     }
 
@@ -98,11 +104,14 @@ public class RocksDBBlockCacheMetricsTest {
     public void shouldRecordCorrectBlockCachePinnedUsage(final RocksDBStore 
store, final StateStoreContext ctx) {
         withStore(store, ctx, () -> {
             final BlockBasedTableConfigWithAccessibleCache tableFormatConfig = 
(BlockBasedTableConfigWithAccessibleCache) 
store.getOptions().tableFormatConfig();
-            final long usage = tableFormatConfig.blockCache().getPinnedUsage();
-            assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+            try (final Cache blockCache = tableFormatConfig.blockCache()) {
+                final long usage = blockCache.getPinnedUsage();
+                assertMetric(ctx, STATE_STORE_LEVEL_GROUP, 
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+            }
         });
     }
 
+    @SuppressWarnings("resource")
     public <T> void assertMetric(final StateStoreContext context, final String 
group, final String metricName, final T expected) {
         final StreamsMetricsImpl metrics = 
ProcessorContextUtils.metricsImpl(context);
         final MetricName name = metrics.metricsRegistry().metricName(
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
 
b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
index a3c7194680d..507ac389061 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
@@ -41,12 +41,16 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
 
 import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 
 public class MockInternalNewProcessorContext<KOut, VOut> extends 
MockProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut, 
VOut> {
 
-    private ProcessorNode currentNode;
+    private ProcessorNode<?, ?, ?, ?> currentNode;
+    private RecordCollector recordCollector;
+    private final Map<String, StateRestoreCallback> restoreCallbacks = new 
LinkedHashMap<>();
     private long currentSystemTimeMs;
     private final TaskType taskType = TaskType.ACTIVE;
 
@@ -108,12 +112,12 @@ public class MockInternalNewProcessorContext<KOut, VOut> 
extends MockProcessorCo
     }
 
     @Override
-    public void setCurrentNode(final ProcessorNode currentNode) {
+    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
         this.currentNode = currentNode;
     }
 
     @Override
-    public ProcessorNode currentNode() {
+    public ProcessorNode<?, ?, ?, ?> currentNode() {
         return currentNode;
     }
 
@@ -128,9 +132,19 @@ public class MockInternalNewProcessorContext<KOut, VOut> 
extends MockProcessorCo
     @Override
     public void uninitialize() {}
 
+    @Override
+    public RecordCollector recordCollector() {
+        return recordCollector;
+    }
+
+    public void setRecordCollector(final RecordCollector recordCollector) {
+        this.recordCollector = recordCollector;
+    }
+
     @Override
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallback) {
+        restoreCallbacks.put(store.name(), stateRestoreCallback);
         addStateStore(store);
     }
 
@@ -138,9 +152,14 @@ public class MockInternalNewProcessorContext<KOut, VOut> 
extends MockProcessorCo
     public void register(final StateStore store,
                          final StateRestoreCallback stateRestoreCallback,
                          final CommitCallback checkpoint) {
+        restoreCallbacks.put(store.name(), stateRestoreCallback);
         addStateStore(store);
     }
 
+    public StateRestoreCallback stateRestoreCallback(final String storeName) {
+        return restoreCallbacks.get(storeName);
+    }
+
     @Override
     public <K, V> void forward(K key, V value) {
         throw new UnsupportedOperationException("Migrate to new 
implementation");
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
deleted file mode 100644
index 4d4ad0e4dc0..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.CommitCallback;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.FixedKeyRecord;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorMetadata;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-import java.io.File;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-
-@SuppressWarnings("deprecation")
-public class MockInternalProcessorContext extends 
org.apache.kafka.streams.processor.MockProcessorContext implements 
InternalProcessorContext<Object, Object> {
-
-    private final Map<String, StateRestoreCallback> restoreCallbacks = new 
LinkedHashMap<>();
-    private ProcessorNode currentNode;
-    private RecordCollector recordCollector;
-    private long currentSystemTimeMs;
-    private final TaskType taskType = TaskType.ACTIVE;
-    private ProcessorMetadata processorMetadata;
-
-    public MockInternalProcessorContext() {
-        processorMetadata = new ProcessorMetadata();
-    }
-
-    public MockInternalProcessorContext(final Properties config, final TaskId 
taskId, final File stateDir) {
-        super(config, taskId, stateDir);
-        processorMetadata = new ProcessorMetadata();
-    }
-
-    @Override
-    public void setSystemTimeMs(long timeMs) {
-        currentSystemTimeMs = timeMs;
-    }
-
-    @Override
-    public long currentSystemTimeMs() {
-        return currentSystemTimeMs;
-    }
-
-    @Override
-    public StreamsMetricsImpl metrics() {
-        return (StreamsMetricsImpl) super.metrics();
-    }
-
-    @Override
-    public <K, V> void forward(final Record<K, V> record) {
-        forward(record.key(), record.value(), 
To.all().withTimestamp(record.timestamp()));
-    }
-
-    @Override
-    public <K, V> void forward(final Record<K, V> record, final String 
childName) {
-        forward(record.key(), record.value(), 
To.child(childName).withTimestamp(record.timestamp()));
-    }
-
-    @Override
-    public ProcessorRecordContext recordContext() {
-        return new ProcessorRecordContext(timestamp(), offset(), partition(), 
topic(), headers());
-    }
-
-    @Override
-    public Optional<RecordMetadata> recordMetadata() {
-        return Optional.of(recordContext());
-    }
-
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        setRecordMetadata(
-            recordContext.topic(),
-            recordContext.partition(),
-            recordContext.offset(),
-            recordContext.headers(),
-            recordContext.timestamp()
-        );
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode currentNode) {
-        this.currentNode = currentNode;
-    }
-
-    @Override
-    public ProcessorNode currentNode() {
-        return currentNode;
-    }
-
-    @Override
-    public ThreadCache cache() {
-        return null;
-    }
-
-    @Override
-    public void initialize() {}
-
-    @Override
-    public void uninitialize() {}
-
-    @Override
-    public RecordCollector recordCollector() {
-        return recordCollector;
-    }
-
-    public void setRecordCollector(final RecordCollector recordCollector) {
-        this.recordCollector = recordCollector;
-    }
-
-    @Override
-    public void register(final StateStore store,
-                         final StateRestoreCallback stateRestoreCallback) {
-        restoreCallbacks.put(store.name(), stateRestoreCallback);
-        super.register(store, stateRestoreCallback);
-    }
-
-    @Override
-    public void register(final StateStore store,
-                         final StateRestoreCallback stateRestoreCallback,
-                         final CommitCallback checkpoint) {
-        restoreCallbacks.put(store.name(), stateRestoreCallback);
-        super.register(store, stateRestoreCallback);
-    }
-
-    public StateRestoreCallback stateRestoreCallback(final String storeName) {
-        return restoreCallbacks.get(storeName);
-    }
-
-    @Override
-    public TaskType taskType() {
-        return taskType;
-    }
-
-    @Override
-    public void logChange(final String storeName,
-                          final Bytes key,
-                          final byte[] value,
-                          final long timestamp,
-                          final Position position) {
-    }
-
-    @Override
-    public void transitionToActive(final StreamTask streamTask, final 
RecordCollector recordCollector, final ThreadCache newCache) {
-    }
-
-    @Override
-    public void transitionToStandby(final ThreadCache newCache) {
-    }
-
-    @Override
-    public void registerCacheFlushListener(final String namespace, final 
DirtyEntryFlushListener listener) {
-    }
-
-    @Override
-    public String changelogFor(final String storeName) {
-        return "mock-changelog";
-    }
-
-    @Override
-    public void addProcessorMetadataKeyValue(final String key, final long 
value) {
-        processorMetadata.put(key, value);
-    }
-
-    @Override
-    public Long processorMetadataForKey(final String key) {
-        return processorMetadata.get(key);
-    }
-
-    @Override
-    public void setProcessorMetadata(final ProcessorMetadata metadata) {
-        Objects.requireNonNull(metadata);
-        processorMetadata = metadata;
-    }
-
-    @Override
-    public ProcessorMetadata processorMetadata() {
-        return processorMetadata;
-    }
-
-    @Override
-    public <K, V> void forward(final FixedKeyRecord<K, V> record) {
-        forward(new Record<>(record.key(), record.value(), record.timestamp(), 
record.headers()));
-    }
-
-    @Override
-    public <K, V> void forward(final FixedKeyRecord<K, V> record, final String 
childName) {
-        forward(
-            new Record<>(record.key(), record.value(), record.timestamp(), 
record.headers()),
-            childName
-        );
-    }
-}
\ No newline at end of file

Reply via email to