This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b3b40bb77b6 KAFKA-13722: Refactor Kafka Streams store interfaces
(#18243)
b3b40bb77b6 is described below
commit b3b40bb77b60fae323bb98100be59d003e27a986
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Dec 19 11:55:57 2024 -0800
KAFKA-13722: Refactor Kafka Streams store interfaces (#18243)
Refactor Segments and TimestampedSegments to not use old
ProcessorContext any longer.
Reviewers: Bruno Cadonna <[email protected]>
---
.../streams/state/internals/AbstractSegments.java | 6 ++---
.../streams/state/internals/KeyValueSegments.java | 8 +++----
.../state/internals/LogicalKeyValueSegments.java | 10 ++++----
.../state/internals/RocksDBVersionedStore.java | 7 +++---
.../RocksDBVersionedStoreRestoreWriteBuffer.java | 6 ++---
.../kafka/streams/state/internals/Segments.java | 8 +++----
.../state/internals/TimestampedSegments.java | 8 +++----
.../streams/state/KeyValueStoreTestDriver.java | 28 ++++++++++++----------
.../state/internals/KeyValueSegmentTest.java | 9 +------
.../state/internals/TimestampedSegmentTest.java | 9 +------
10 files changed, 43 insertions(+), 56 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index 5611fe99d24..4f7ca5e59ae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
import org.slf4j.Logger;
@@ -81,7 +81,7 @@ abstract class AbstractSegments<S extends Segment> implements
Segments<S> {
@Override
public S getOrCreateSegmentIfLive(final long segmentId,
- final ProcessorContext context,
+ final StateStoreContext context,
final long streamTime) {
final long minLiveTimestamp = streamTime - retentionPeriod;
final long minLiveSegment = segmentId(minLiveTimestamp);
@@ -95,7 +95,7 @@ abstract class AbstractSegments<S extends Segment> implements
Segments<S> {
}
@Override
- public void openExisting(final ProcessorContext context, final long
streamTime) {
+ public void openExisting(final StateStoreContext context, final long
streamTime) {
try {
final File dir = new File(context.stateDir(), name);
if (dir.exists()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
index 304d77e8259..a18d901b83f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -37,7 +37,7 @@ class KeyValueSegments extends
AbstractSegments<KeyValueSegment> {
@Override
public KeyValueSegment getOrCreateSegment(final long segmentId,
- final ProcessorContext context) {
+ final StateStoreContext context)
{
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
@@ -55,7 +55,7 @@ class KeyValueSegments extends
AbstractSegments<KeyValueSegment> {
@Override
public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId,
- final ProcessorContext
context,
+ final StateStoreContext
context,
final long streamTime) {
final KeyValueSegment segment =
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
@@ -63,7 +63,7 @@ class KeyValueSegments extends
AbstractSegments<KeyValueSegment> {
}
@Override
- public void openExisting(final ProcessorContext context, final long
streamTime) {
+ public void openExisting(final StateStoreContext context, final long
streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
super.openExisting(context, streamTime);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
index bcbeb4689b3..c46a2c2788c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -29,8 +29,8 @@ import java.util.Map;
* Regular segments with {@code segmentId >= 0} expire according to the
specified
* retention period. "Reserved" segments with {@code segmentId < 0} do not
expire
* and are completely separate from regular segments in that methods such as
- * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long,
ProcessorContext)},
- * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
+ * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long,
StateStoreContext)},
+ * {@link #getOrCreateSegmentIfLive(long, StateStoreContext, long)},
* {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
* only return regular segments and not reserved segments. The methods {@link
#flush()}
* and {@link #close()} flush and close both regular and reserved segments,
due to
@@ -62,7 +62,7 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
@Override
public LogicalKeyValueSegment getOrCreateSegment(final long segmentId,
- final ProcessorContext
context) {
+ final StateStoreContext
context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
@@ -103,7 +103,7 @@ public class LogicalKeyValueSegments extends
AbstractSegments<LogicalKeyValueSeg
}
@Override
- public void openExisting(final ProcessorContext context, final long
streamTime) {
+ public void openExisting(final StateStoreContext context, final long
streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
physicalStore.openDB(context.appConfigs(), context.stateDir());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index eaaed6f30e3..54580a26a1b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -23,7 +23,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import
org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
@@ -102,7 +101,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
private final RocksDBVersionedStoreClient versionedStoreClient;
private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
- private InternalProcessorContext internalProcessorContext;
+ private InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
@@ -489,7 +488,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
/**
* @return the segment with the provided id, or {@code null} if the
segment is expired
*/
- T getOrCreateSegmentIfLive(long segmentId, ProcessorContext context,
long streamTime);
+ T getOrCreateSegmentIfLive(long segmentId, StateStoreContext context,
long streamTime);
/**
* @return all segments in the store which contain timestamps at least
the provided
@@ -525,7 +524,7 @@ public class RocksDBVersionedStore implements
VersionedKeyValueStore<Bytes, byte
}
@Override
- public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long
segmentId, final ProcessorContext context, final long streamTime) {
+ public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long
segmentId, final StateStoreContext context, final long streamTime) {
return segmentStores.getOrCreateSegmentIfLive(segmentId, context,
streamTime);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
index bd82465ec49..e39c1193b48 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java
@@ -18,7 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.RocksDBVersionedStoreClient;
import
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
import
org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
@@ -91,7 +91,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
// older segments/stores before later ones
try (final WriteBatch segmentsBatch = new WriteBatch()) {
final List<WriteBufferSegmentWithDbFallback> allSegments =
restoreClient.reversedSegments(Long.MIN_VALUE);
- if (allSegments.size() > 0) {
+ if (!allSegments.isEmpty()) {
// collect entries into write batch
for (final WriteBufferSegmentWithDbFallback bufferSegment :
allSegments) {
final LogicalKeyValueSegment dbSegment =
bufferSegment.dbSegment();
@@ -206,7 +206,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
}
@Override
- public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final
long segmentId, final ProcessorContext context, final long streamTime) {
+ public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final
long segmentId, final StateStoreContext context, final long streamTime) {
if (segmentsWriteBuffer.containsKey(segmentId)) {
return segmentsWriteBuffer.get(segmentId);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index a3ef2426c3c..18086a5441b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import java.util.List;
@@ -28,11 +28,11 @@ interface Segments<S extends Segment> {
S segmentForTimestamp(final long timestamp);
- S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext
context, final long streamTime);
+ S getOrCreateSegmentIfLive(final long segmentId, final StateStoreContext
context, final long streamTime);
- S getOrCreateSegment(final long segmentId, final ProcessorContext context);
+ S getOrCreateSegment(final long segmentId, final StateStoreContext
context);
- void openExisting(final ProcessorContext context, final long streamTime);
+ void openExisting(final StateStoreContext context, final long streamTime);
List<S> segments(final long timeFrom, final long timeTo, final boolean
forward);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
index 597d7bf0ce0..70fae503060 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -37,7 +37,7 @@ class TimestampedSegments extends
AbstractSegments<TimestampedSegment> {
@Override
public TimestampedSegment getOrCreateSegment(final long segmentId,
- final ProcessorContext
context) {
+ final StateStoreContext
context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
@@ -55,7 +55,7 @@ class TimestampedSegments extends
AbstractSegments<TimestampedSegment> {
@Override
public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId,
- final ProcessorContext
context,
+ final StateStoreContext
context,
final long streamTime) {
final TimestampedSegment segment =
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
@@ -63,7 +63,7 @@ class TimestampedSegments extends
AbstractSegments<TimestampedSegment> {
}
@Override
- public void openExisting(final ProcessorContext context, final long
streamTime) {
+ public void openExisting(final StateStoreContext context, final long
streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
super.openExisting(context, streamTime);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 0de1e1e606a..fdfed28ece2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -27,12 +27,13 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -62,7 +63,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
- * A component that provides a {@link #context() ProcessingContext} that can
be supplied to a {@link KeyValueStore} so that
+ * A component that provides a {@link #context() StateStoreContext} that can
be supplied to a {@link KeyValueStore} so that
* all entries written to the Kafka topic by the store during {@link
KeyValueStore#flush()} are captured for testing purposes.
* This class simplifies testing of various {@link KeyValueStore} instances,
especially those that use
* {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka
topic.
@@ -110,7 +111,7 @@ import static org.mockito.Mockito.when;
*
* <h2>Restoring a store</h2>
* This component can be used to test whether a {@link KeyValueStore}
implementation properly
- * {@link ProcessorContext#register(StateStore, StateRestoreCallback)
registers itself} with the {@link ProcessorContext}, so that
+ * {@link StateStoreContext#register(StateStore, StateRestoreCallback)
registers itself} with the {@link StateStoreContext}, so that
* the persisted contents of a store are properly restored from the flushed
entries when the store instance is started.
* <p>
* To do this, create an instance of this driver component, {@link
#addEntryToRestoreLog(Object, Object) add entries} that will be
@@ -149,7 +150,7 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Create a driver object that will have a {@link #context()} that records
messages
- * {@link ProcessorContext#forward(Object, Object) forwarded} by the store
and that provides default serializers and
+ * {@link ProcessorContext#forward(Record) forwarded} by the store and
that provides default serializers and
* deserializers for the given built-in key and value types (e.g., {@code
String.class}, {@code Integer.class},
* {@code Long.class}, and {@code byte[].class}). This can be used when
store is created to rely upon the
* ProcessorContext's default key and value serializers and deserializers.
@@ -167,14 +168,14 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Create a driver object that will have a {@link #context()} that records
messages
- * {@link ProcessorContext#forward(Object, Object) forwarded} by the store
and that provides the specified serializers and
+ * {@link ProcessorContext#forward(Record) forwarded} by the store and
that provides the specified serializers and
* deserializers. This can be used when store is created to rely upon the
ProcessorContext's default key and value serializers
* and deserializers.
*
- * @param keySerializer the key serializer for the {@link
ProcessorContext}; may not be null
- * @param keyDeserializer the key deserializer for the {@link
ProcessorContext}; may not be null
- * @param valueSerializer the value serializer for the {@link
ProcessorContext}; may not be null
- * @param valueDeserializer the value deserializer for the {@link
ProcessorContext}; may not be null
+ * @param keySerializer the key serializer for the {@link
StateStoreContext}; may not be null
+ * @param keyDeserializer the key deserializer for the {@link
StateStoreContext}; may not be null
+ * @param valueSerializer the value serializer for the {@link
StateStoreContext}; may not be null
+ * @param valueDeserializer the value deserializer for the {@link
StateStoreContext}; may not be null
* @return the test driver; never null
*/
public static <K, V> KeyValueStoreTestDriver<K, V> create(final
Serializer<K> keySerializer,
@@ -195,6 +196,7 @@ public class KeyValueStoreTestDriver<K, V> {
private final InternalMockProcessorContext<?, ?> context;
private final StateSerdes<K, V> stateSerdes;
+ @SuppressWarnings("resource")
private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
@@ -264,7 +266,7 @@ public class KeyValueStoreTestDriver<K, V> {
stateDir.mkdirs();
stateSerdes = serdes;
- context = new InternalMockProcessorContext<Object, Object>(stateDir,
serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
+ context = new InternalMockProcessorContext<>(stateDir,
serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
final ThreadCache cache = new ThreadCache(new
LogContext("testCache "), 1024 * 1024L, metrics());
@Override
@@ -298,7 +300,7 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Get the entries that are restored to a KeyValueStore when it is
constructed with this driver's {@link #context()
- * ProcessorContext}.
+ * StateStoreContext}.
*
* @return the restore entries; never null but possibly a null iterator
*/
@@ -345,7 +347,7 @@ public class KeyValueStoreTestDriver<K, V> {
* {@link #flushedEntryRemoved(Object)} methods.
* <p>
* If the {@link KeyValueStore}'s are to be restored upon its startup, be
sure to {@link #addEntryToRestoreLog(Object, Object)
- * add the restore entries} before creating the store with the {@link
ProcessorContext} returned by this method.
+ * add the restore entries} before creating the store with the {@link
StateStoreContext} returned by this method.
*
* @return the processing context; never null
* @see #addEntryToRestoreLog(Object, Object)
@@ -378,7 +380,7 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Utility method to compute the number of entries within the store.
*
- * @param store the key value store using this {@link #context()}.
+ * @param store the key value store using this {@link #context()
StateStoreContext}.
* @return the number of entries
*/
public int sizeOf(final KeyValueStore<K, V> store) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
index 119bda69c9f..e71704f32af 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
@@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -68,11 +65,7 @@ public class KeyValueSegmentTest {
final String directoryPath =
TestUtils.tempDirectory().getAbsolutePath();
final File directory = new File(directoryPath);
- final ProcessorContext mockContext = mock(ProcessorContext.class);
-
when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG,
"INFO")));
- when(mockContext.stateDir()).thenReturn(directory);
-
- segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
+ segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")),
directory);
assertTrue(new File(directoryPath, "window").exists());
assertTrue(new File(directoryPath + File.separator + "window",
"segment").exists());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
index 82a76ba13a6..633d14c1e63 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
@@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -68,11 +65,7 @@ public class TimestampedSegmentTest {
final String directoryPath =
TestUtils.tempDirectory().getAbsolutePath();
final File directory = new File(directoryPath);
- final ProcessorContext mockContext = mock(ProcessorContext.class);
-
when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG,
"INFO")));
- when(mockContext.stateDir()).thenReturn(directory);
-
- segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
+ segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")),
directory);
assertTrue(new File(directoryPath, "window").exists());
assertTrue(new File(directoryPath + File.separator + "window",
"segment").exists());