This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3b40bb77b6 KAFKA-13722: Refactor Kafka Streams store interfaces 
(#18243)
b3b40bb77b6 is described below

commit b3b40bb77b60fae323bb98100be59d003e27a986
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Dec 19 11:55:57 2024 -0800

    KAFKA-13722: Refactor Kafka Streams store interfaces (#18243)
    
    Refactor Segments and TimestampedSegments to not use old
    ProcessorContext any longer.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../streams/state/internals/AbstractSegments.java  |  6 ++---
 .../streams/state/internals/KeyValueSegments.java  |  8 +++----
 .../state/internals/LogicalKeyValueSegments.java   | 10 ++++----
 .../state/internals/RocksDBVersionedStore.java     |  7 +++---
 .../RocksDBVersionedStoreRestoreWriteBuffer.java   |  6 ++---
 .../kafka/streams/state/internals/Segments.java    |  8 +++----
 .../state/internals/TimestampedSegments.java       |  8 +++----
 .../streams/state/KeyValueStoreTestDriver.java     | 28 ++++++++++++----------
 .../state/internals/KeyValueSegmentTest.java       |  9 +------
 .../state/internals/TimestampedSegmentTest.java    |  9 +------
 10 files changed, 43 insertions(+), 56 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index 5611fe99d24..4f7ca5e59ae 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.query.Position;
 
 import org.slf4j.Logger;
@@ -81,7 +81,7 @@ abstract class AbstractSegments<S extends Segment> implements 
Segments<S> {
 
     @Override
     public S getOrCreateSegmentIfLive(final long segmentId,
-                                      final ProcessorContext context,
+                                      final StateStoreContext context,
                                       final long streamTime) {
         final long minLiveTimestamp = streamTime - retentionPeriod;
         final long minLiveSegment = segmentId(minLiveTimestamp);
@@ -95,7 +95,7 @@ abstract class AbstractSegments<S extends Segment> implements 
Segments<S> {
     }
 
     @Override
-    public void openExisting(final ProcessorContext context, final long 
streamTime) {
+    public void openExisting(final StateStoreContext context, final long 
streamTime) {
         try {
             final File dir = new File(context.stateDir(), name);
             if (dir.exists()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index 304d77e8259..a18d901b83f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
@@ -37,7 +37,7 @@ class KeyValueSegments extends 
AbstractSegments<KeyValueSegment> {
 
     @Override
     public KeyValueSegment getOrCreateSegment(final long segmentId,
-                                              final ProcessorContext context) {
+                                              final StateStoreContext context) 
{
         if (segments.containsKey(segmentId)) {
             return segments.get(segmentId);
         } else {
@@ -55,7 +55,7 @@ class KeyValueSegments extends 
AbstractSegments<KeyValueSegment> {
 
     @Override
     public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId,
-                                                    final ProcessorContext 
context,
+                                                    final StateStoreContext 
context,
                                                     final long streamTime) {
         final KeyValueSegment segment = 
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
         cleanupExpiredSegments(streamTime);
@@ -63,7 +63,7 @@ class KeyValueSegments extends 
AbstractSegments<KeyValueSegment> {
     }
 
     @Override
-    public void openExisting(final ProcessorContext context, final long 
streamTime) {
+    public void openExisting(final StateStoreContext context, final long 
streamTime) {
         metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
         super.openExisting(context, streamTime);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index bcbeb4689b3..c46a2c2788c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -29,8 +29,8 @@ import java.util.Map;
  * Regular segments with {@code segmentId >= 0} expire according to the 
specified
  * retention period. "Reserved" segments with {@code segmentId < 0} do not 
expire
  * and are completely separate from regular segments in that methods such as
- * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, 
ProcessorContext)},
- * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
+ * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, 
StateStoreContext)},
+ * {@link #getOrCreateSegmentIfLive(long, StateStoreContext, long)},
  * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
  * only return regular segments and not reserved segments. The methods {@link 
#flush()}
  * and {@link #close()} flush and close both regular and reserved segments, 
due to
@@ -62,7 +62,7 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
 
     @Override
     public LogicalKeyValueSegment getOrCreateSegment(final long segmentId,
-                                                     final ProcessorContext 
context) {
+                                                     final StateStoreContext 
context) {
         if (segments.containsKey(segmentId)) {
             return segments.get(segmentId);
         } else {
@@ -103,7 +103,7 @@ public class LogicalKeyValueSegments extends 
AbstractSegments<LogicalKeyValueSeg
     }
 
     @Override
-    public void openExisting(final ProcessorContext context, final long 
streamTime) {
+    public void openExisting(final StateStoreContext context, final long 
streamTime) {
         metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
         physicalStore.openDB(context.appConfigs(), context.stateDir());
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index eaaed6f30e3..54580a26a1b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import 
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
@@ -102,7 +101,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
     private final RocksDBVersionedStoreClient versionedStoreClient;
     private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
 
-    private InternalProcessorContext internalProcessorContext;
+    private InternalProcessorContext<?, ?> internalProcessorContext;
     private Sensor expiredRecordSensor;
     private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
     private boolean consistencyEnabled = false;
@@ -489,7 +488,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         /**
          * @return the segment with the provided id, or {@code null} if the 
segment is expired
          */
-        T getOrCreateSegmentIfLive(long segmentId, ProcessorContext context, 
long streamTime);
+        T getOrCreateSegmentIfLive(long segmentId, StateStoreContext context, 
long streamTime);
 
         /**
          * @return all segments in the store which contain timestamps at least 
the provided
@@ -525,7 +524,7 @@ public class RocksDBVersionedStore implements 
VersionedKeyValueStore<Bytes, byte
         }
 
         @Override
-        public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long 
segmentId, final ProcessorContext context, final long streamTime) {
+        public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long 
segmentId, final StateStoreContext context, final long streamTime) {
             return segmentStores.getOrCreateSegmentIfLive(segmentId, context, 
streamTime);
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
index bd82465ec49..e39c1193b48 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.RocksDBVersionedStoreClient;
 import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
 import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
@@ -91,7 +91,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
         // older segments/stores before later ones
         try (final WriteBatch segmentsBatch = new WriteBatch()) {
             final List<WriteBufferSegmentWithDbFallback> allSegments = 
restoreClient.reversedSegments(Long.MIN_VALUE);
-            if (allSegments.size() > 0) {
+            if (!allSegments.isEmpty()) {
                 // collect entries into write batch
                 for (final WriteBufferSegmentWithDbFallback bufferSegment : 
allSegments) {
                     final LogicalKeyValueSegment dbSegment = 
bufferSegment.dbSegment();
@@ -206,7 +206,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
         }
 
         @Override
-        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final 
long segmentId, final ProcessorContext context, final long streamTime) {
+        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final 
long segmentId, final StateStoreContext context, final long streamTime) {
             if (segmentsWriteBuffer.containsKey(segmentId)) {
                 return segmentsWriteBuffer.get(segmentId);
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index a3ef2426c3c..18086a5441b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 
 import java.util.List;
 
@@ -28,11 +28,11 @@ interface Segments<S extends Segment> {
 
     S segmentForTimestamp(final long timestamp);
 
-    S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext 
context, final long streamTime);
+    S getOrCreateSegmentIfLive(final long segmentId, final StateStoreContext 
context, final long streamTime);
 
-    S getOrCreateSegment(final long segmentId, final ProcessorContext context);
+    S getOrCreateSegment(final long segmentId, final StateStoreContext 
context);
 
-    void openExisting(final ProcessorContext context, final long streamTime);
+    void openExisting(final StateStoreContext context, final long streamTime);
 
     List<S> segments(final long timeFrom, final long timeTo, final boolean 
forward);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 597d7bf0ce0..70fae503060 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
@@ -37,7 +37,7 @@ class TimestampedSegments extends 
AbstractSegments<TimestampedSegment> {
 
     @Override
     public TimestampedSegment getOrCreateSegment(final long segmentId,
-                                                 final ProcessorContext 
context) {
+                                                 final StateStoreContext 
context) {
         if (segments.containsKey(segmentId)) {
             return segments.get(segmentId);
         } else {
@@ -55,7 +55,7 @@ class TimestampedSegments extends 
AbstractSegments<TimestampedSegment> {
 
     @Override
     public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId,
-                                                    final ProcessorContext 
context,
+                                                    final StateStoreContext 
context,
                                                     final long streamTime) {
         final TimestampedSegment segment = 
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
         cleanupExpiredSegments(streamTime);
@@ -63,7 +63,7 @@ class TimestampedSegments extends 
AbstractSegments<TimestampedSegment> {
     }
 
     @Override
-    public void openExisting(final ProcessorContext context, final long 
streamTime) {
+    public void openExisting(final StateStoreContext context, final long 
streamTime) {
         metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
         super.openExisting(context, streamTime);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 0de1e1e606a..fdfed28ece2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -27,12 +27,13 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -62,7 +63,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
- * A component that provides a {@link #context() ProcessingContext} that can 
be supplied to a {@link KeyValueStore} so that
+ * A component that provides a {@link #context() StateStoreContext} that can 
be supplied to a {@link KeyValueStore} so that
  * all entries written to the Kafka topic by the store during {@link 
KeyValueStore#flush()} are captured for testing purposes.
  * This class simplifies testing of various {@link KeyValueStore} instances, 
especially those that use
  * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka 
topic.
@@ -110,7 +111,7 @@ import static org.mockito.Mockito.when;
  *
  * <h2>Restoring a store</h2>
  * This component can be used to test whether a {@link KeyValueStore} 
implementation properly
- * {@link ProcessorContext#register(StateStore, StateRestoreCallback) 
registers itself} with the {@link ProcessorContext}, so that
+ * {@link StateStoreContext#register(StateStore, StateRestoreCallback) 
registers itself} with the {@link StateStoreContext}, so that
  * the persisted contents of a store are properly restored from the flushed 
entries when the store instance is started.
  * <p>
  * To do this, create an instance of this driver component, {@link 
#addEntryToRestoreLog(Object, Object) add entries} that will be
@@ -149,7 +150,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
     /**
      * Create a driver object that will have a {@link #context()} that records 
messages
-     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store 
and that provides default serializers and
+     * {@link ProcessorContext#forward(Record) forwarded} by the store and 
that provides default serializers and
      * deserializers for the given built-in key and value types (e.g., {@code 
String.class}, {@code Integer.class},
      * {@code Long.class}, and {@code byte[].class}). This can be used when 
store is created to rely upon the
      * ProcessorContext's default key and value serializers and deserializers.
@@ -167,14 +168,14 @@ public class KeyValueStoreTestDriver<K, V> {
 
     /**
      * Create a driver object that will have a {@link #context()} that records 
messages
-     * {@link ProcessorContext#forward(Object, Object) forwarded} by the store 
and that provides the specified serializers and
+     * {@link ProcessorContext#forward(Record) forwarded} by the store and 
that provides the specified serializers and
      * deserializers. This can be used when store is created to rely upon the 
ProcessorContext's default key and value serializers
      * and deserializers.
      *
-     * @param keySerializer     the key serializer for the {@link 
ProcessorContext}; may not be null
-     * @param keyDeserializer   the key deserializer for the {@link 
ProcessorContext}; may not be null
-     * @param valueSerializer   the value serializer for the {@link 
ProcessorContext}; may not be null
-     * @param valueDeserializer the value deserializer for the {@link 
ProcessorContext}; may not be null
+     * @param keySerializer     the key serializer for the {@link 
StateStoreContext}; may not be null
+     * @param keyDeserializer   the key deserializer for the {@link 
StateStoreContext}; may not be null
+     * @param valueSerializer   the value serializer for the {@link 
StateStoreContext}; may not be null
+     * @param valueDeserializer the value deserializer for the {@link 
StateStoreContext}; may not be null
      * @return the test driver; never null
      */
     public static <K, V> KeyValueStoreTestDriver<K, V> create(final 
Serializer<K> keySerializer,
@@ -195,6 +196,7 @@ public class KeyValueStoreTestDriver<K, V> {
     private final InternalMockProcessorContext<?, ?> context;
     private final StateSerdes<K, V> stateSerdes;
 
+    @SuppressWarnings("resource")
     private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
         props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
@@ -264,7 +266,7 @@ public class KeyValueStoreTestDriver<K, V> {
         stateDir.mkdirs();
         stateSerdes = serdes;
 
-        context = new InternalMockProcessorContext<Object, Object>(stateDir, 
serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
+        context = new InternalMockProcessorContext<>(stateDir, 
serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
             final ThreadCache cache = new ThreadCache(new 
LogContext("testCache "), 1024 * 1024L, metrics());
 
             @Override
@@ -298,7 +300,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
     /**
      * Get the entries that are restored to a KeyValueStore when it is 
constructed with this driver's {@link #context()
-     * ProcessorContext}.
+     * StateStoreContext}.
      *
      * @return the restore entries; never null but possibly a null iterator
      */
@@ -345,7 +347,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * {@link #flushedEntryRemoved(Object)} methods.
      * <p>
      * If the {@link KeyValueStore}'s are to be restored upon its startup, be 
sure to {@link #addEntryToRestoreLog(Object, Object)
-     * add the restore entries} before creating the store with the {@link 
ProcessorContext} returned by this method.
+     * add the restore entries} before creating the store with the {@link 
StateStoreContext} returned by this method.
      *
      * @return the processing context; never null
      * @see #addEntryToRestoreLog(Object, Object)
@@ -378,7 +380,7 @@ public class KeyValueStoreTestDriver<K, V> {
     /**
      * Utility method to compute the number of entries within the store.
      *
-     * @param store the key value store using this {@link #context()}.
+     * @param store the key value store using this {@link #context() 
StateStoreContext}.
      * @return the number of entries
      */
     public int sizeOf(final KeyValueStore<K, V> store) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index 119bda69c9f..e71704f32af 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.query.Position;
@@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -68,11 +65,7 @@ public class KeyValueSegmentTest {
         final String directoryPath = 
TestUtils.tempDirectory().getAbsolutePath();
         final File directory = new File(directoryPath);
 
-        final ProcessorContext mockContext = mock(ProcessorContext.class);
-        
when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG,
 "INFO")));
-        when(mockContext.stateDir()).thenReturn(directory);
-
-        segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
+        segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), 
directory);
 
         assertTrue(new File(directoryPath, "window").exists());
         assertTrue(new File(directoryPath + File.separator + "window", 
"segment").exists());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 82a76ba13a6..633d14c1e63 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.query.Position;
@@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -68,11 +65,7 @@ public class TimestampedSegmentTest {
         final String directoryPath = 
TestUtils.tempDirectory().getAbsolutePath();
         final File directory = new File(directoryPath);
 
-        final ProcessorContext mockContext = mock(ProcessorContext.class);
-        
when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG,
 "INFO")));
-        when(mockContext.stateDir()).thenReturn(directory);
-
-        segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
+        segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), 
directory);
 
         assertTrue(new File(directoryPath, "window").exists());
         assertTrue(new File(directoryPath + File.separator + "window", 
"segment").exists());

Reply via email to