This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 990c8c750c5 MINOR: remove old procesor API
MockInternalProcessorContext (#18103)
990c8c750c5 is described below
commit 990c8c750c51c0947a2242313066a26d1262ac9d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Dec 11 15:09:13 2024 -0800
MINOR: remove old procesor API MockInternalProcessorContext (#18103)
Reviewers: Bill Bejeck <[email protected]>
---
.../internals/TimeOrderedKeyValueBufferTest.java | 86 ++++----
.../metrics/RocksDBBlockCacheMetricsTest.java | 27 ++-
.../test/MockInternalNewProcessorContext.java | 25 ++-
.../kafka/test/MockInternalProcessorContext.java | 224 ---------------------
4 files changed, 83 insertions(+), 279 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
index 1d81dffdd78..c8b9aaedf50 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java
@@ -29,14 +29,13 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.internals.Change;
-import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import
org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
@@ -96,21 +95,21 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
this.bufferSupplier = bufferSupplier;
}
- private static MockInternalProcessorContext makeContext() {
+ private static MockInternalNewProcessorContext<?, ?> makeContext() {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
final TaskId taskId = new TaskId(0, 0);
- final MockInternalProcessorContext context = new
MockInternalProcessorContext(properties, taskId, TestUtils.tempDirectory());
+ final MockInternalNewProcessorContext<?, ?> context = new
MockInternalNewProcessorContext<>(properties, taskId,
TestUtils.tempDirectory());
context.setRecordCollector(new MockRecordCollector());
return context;
}
- private static void cleanup(final MockInternalProcessorContext context,
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer) {
+ private static void cleanup(final MockInternalNewProcessorContext<?, ?>
context, final TimeOrderedKeyValueBuffer<String, String, Change<String>>
buffer) {
try {
buffer.close();
Utils.delete(context.stateDir());
@@ -124,8 +123,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldInit(final String testName, final Function<String, B>
bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
cleanup(context, buffer);
}
@@ -134,8 +133,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldAcceptData(final String testName, final Function<String,
B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf");
cleanup(context, buffer);
}
@@ -145,8 +144,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldRejectNullValues(final String testName, final
Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
try {
buffer.put(0, new Record<>("asdf", null, 0L), getContext(0));
fail("expected an exception");
@@ -161,8 +160,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldRemoveData(final String testName, final Function<String,
B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "qwer");
assertThat(buffer.numRecords(), is(1));
buffer.evictWhile(() -> true, kv -> { });
@@ -175,8 +174,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldRespectEvictionPredicate(final String testName, final
Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "eyt");
putRecord(buffer, context, 1L, 0L, "zxcv", "rtg");
assertThat(buffer.numRecords(), is(2));
@@ -194,8 +193,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldTrackCount(final String testName, final Function<String,
B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "oin");
assertThat(buffer.numRecords(), is(1));
putRecord(buffer, context, 1L, 0L, "asdf", "wekjn");
@@ -210,8 +209,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldTrackSize(final String testName, final Function<String,
B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 0L, 0L, "asdf", "23roni");
assertThat(buffer.bufferSize(), is(43L));
putRecord(buffer, context, 1L, 0L, "asdf", "3l");
@@ -226,8 +225,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldTrackMinTimestamp(final String testName, final
Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 1L, 0L, "asdf", "2093j");
assertThat(buffer.minTimestamp(), is(1L));
putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i");
@@ -240,8 +239,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp(final
String testName, final Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4");
assertThat(buffer.numRecords(), is(1));
@@ -288,8 +287,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldReturnUndefinedOnPriorValueForNotBufferedKey(final
String testName, final Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
assertThat(buffer.priorValueForBuffered("ASDF"),
is(Maybe.undefined()));
}
@@ -299,8 +298,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldReturnPriorValueForBufferedKey(final String testName,
final Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
final ProcessorRecordContext recordContext = getContext(0L);
context.setRecordContext(recordContext);
@@ -315,8 +314,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldFlush(final String testName, final Function<String, B>
bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
putRecord(buffer, context, 2L, 0L, "asdf", "2093j");
putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i");
putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef");
@@ -388,8 +387,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldRestoreOldUnversionedFormat(final String testName, final
Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
@@ -509,8 +508,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldRestoreV1Format(final String testName, final
Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
@@ -633,8 +632,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldRestoreV2Format(final String testName, final
Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
@@ -759,8 +758,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
// V2 header, so we need to be sure to handle this case as well.
// Note the data is the same as the V3 test.
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
@@ -882,8 +881,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldRestoreV3Format(final String testName, final
Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
@@ -1005,8 +1004,8 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
public void shouldNotRestoreUnrecognizedVersionRecord(final String
testName, final Function<String, B> bufferSupplier) {
setup(testName, bufferSupplier);
final TimeOrderedKeyValueBuffer<String, String, Change<String>> buffer
= bufferSupplier.apply(testName);
- final MockInternalProcessorContext context = makeContext();
- buffer.init((StateStoreContext) context, buffer);
+ final MockInternalNewProcessorContext<?, ?> context = makeContext();
+ buffer.init(context, buffer);
final RecordBatchingStateRestoreCallback stateRestoreCallback =
(RecordBatchingStateRestoreCallback)
context.stateRestoreCallback(testName);
@@ -1039,7 +1038,7 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
}
private static void putRecord(final TimeOrderedKeyValueBuffer<String,
String, Change<String>> buffer,
- final MockInternalProcessorContext context,
+ final MockInternalNewProcessorContext<?, ?>
context,
final long streamTime,
final long recordTimestamp,
final String key,
@@ -1049,11 +1048,12 @@ public class TimeOrderedKeyValueBufferTest<B extends
TimeOrderedKeyValueBuffer<S
buffer.put(streamTime, new Record<>(key, new Change<>(value, null),
0L), recordContext);
}
+ @SuppressWarnings("resource")
private static BufferValue getBufferValue(final String value, final long
timestamp) {
return new BufferValue(
null,
null,
- Serdes.String().serializer().serialize(null, value),
+ new StringSerializer().serialize(null, value),
getContext(timestamp)
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
index 8d189144504..57259cb31c4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBBlockCacheMetricsTest.java
@@ -26,12 +26,13 @@ import
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import
org.apache.kafka.streams.state.internals.BlockBasedTableConfigWithAccessibleCache;
import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.internals.RocksDBTimestampedStore;
-import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.rocksdb.Cache;
import java.io.File;
import java.io.IOException;
@@ -57,8 +58,8 @@ public class RocksDBBlockCacheMetricsTest {
public static Stream<Arguments> stores() {
final File stateDir = TestUtils.tempDirectory("state");
return Stream.of(
- Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new
MockInternalProcessorContext(new Properties(), TASK_ID, stateDir)),
- Arguments.of(new RocksDBTimestampedStore(STORE_NAME,
METRICS_SCOPE), new MockInternalProcessorContext(new Properties(), TASK_ID,
stateDir))
+ Arguments.of(new RocksDBStore(STORE_NAME, METRICS_SCOPE), new
MockInternalNewProcessorContext<>(new Properties(), TASK_ID, stateDir)),
+ Arguments.of(new RocksDBTimestampedStore(STORE_NAME,
METRICS_SCOPE), new MockInternalNewProcessorContext<>(new Properties(),
TASK_ID, stateDir))
);
}
@@ -79,8 +80,11 @@ public class RocksDBBlockCacheMetricsTest {
@ParameterizedTest
@MethodSource("stores")
public void shouldRecordCorrectBlockCacheCapacity(final RocksDBStore
store, final StateStoreContext ctx) {
- withStore(store, ctx, () ->
- assertMetric(ctx, STATE_STORE_LEVEL_GROUP,
RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L)));
+ withStore(
+ store,
+ ctx,
+ () -> assertMetric(ctx, STATE_STORE_LEVEL_GROUP,
RocksDBMetrics.CAPACITY_OF_BLOCK_CACHE, BigInteger.valueOf(50 * 1024 * 1024L))
+ );
}
@ParameterizedTest
@@ -88,8 +92,10 @@ public class RocksDBBlockCacheMetricsTest {
public void shouldRecordCorrectBlockCacheUsage(final RocksDBStore store,
final StateStoreContext ctx) {
withStore(store, ctx, () -> {
final BlockBasedTableConfigWithAccessibleCache tableFormatConfig =
(BlockBasedTableConfigWithAccessibleCache)
store.getOptions().tableFormatConfig();
- final long usage = tableFormatConfig.blockCache().getUsage();
- assertMetric(ctx, STATE_STORE_LEVEL_GROUP,
RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+ try (final Cache blockCache = tableFormatConfig.blockCache()) {
+ final long usage = blockCache.getUsage();
+ assertMetric(ctx, STATE_STORE_LEVEL_GROUP,
RocksDBMetrics.USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+ }
});
}
@@ -98,11 +104,14 @@ public class RocksDBBlockCacheMetricsTest {
public void shouldRecordCorrectBlockCachePinnedUsage(final RocksDBStore
store, final StateStoreContext ctx) {
withStore(store, ctx, () -> {
final BlockBasedTableConfigWithAccessibleCache tableFormatConfig =
(BlockBasedTableConfigWithAccessibleCache)
store.getOptions().tableFormatConfig();
- final long usage = tableFormatConfig.blockCache().getPinnedUsage();
- assertMetric(ctx, STATE_STORE_LEVEL_GROUP,
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+ try (final Cache blockCache = tableFormatConfig.blockCache()) {
+ final long usage = blockCache.getPinnedUsage();
+ assertMetric(ctx, STATE_STORE_LEVEL_GROUP,
RocksDBMetrics.PINNED_USAGE_OF_BLOCK_CACHE, BigInteger.valueOf(usage));
+ }
});
}
+ @SuppressWarnings("resource")
public <T> void assertMetric(final StateStoreContext context, final String
group, final String metricName, final T expected) {
final StreamsMetricsImpl metrics =
ProcessorContextUtils.metricsImpl(context);
final MetricName name = metrics.metricsRegistry().metricName(
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
index a3c7194680d..507ac389061 100644
---
a/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
@@ -41,12 +41,16 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
import
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
import java.io.File;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.Objects;
import java.util.Properties;
public class MockInternalNewProcessorContext<KOut, VOut> extends
MockProcessorContext<KOut, VOut> implements InternalProcessorContext<KOut,
VOut> {
- private ProcessorNode currentNode;
+ private ProcessorNode<?, ?, ?, ?> currentNode;
+ private RecordCollector recordCollector;
+ private final Map<String, StateRestoreCallback> restoreCallbacks = new
LinkedHashMap<>();
private long currentSystemTimeMs;
private final TaskType taskType = TaskType.ACTIVE;
@@ -108,12 +112,12 @@ public class MockInternalNewProcessorContext<KOut, VOut>
extends MockProcessorCo
}
@Override
- public void setCurrentNode(final ProcessorNode currentNode) {
+ public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
this.currentNode = currentNode;
}
@Override
- public ProcessorNode currentNode() {
+ public ProcessorNode<?, ?, ?, ?> currentNode() {
return currentNode;
}
@@ -128,9 +132,19 @@ public class MockInternalNewProcessorContext<KOut, VOut>
extends MockProcessorCo
@Override
public void uninitialize() {}
+ @Override
+ public RecordCollector recordCollector() {
+ return recordCollector;
+ }
+
+ public void setRecordCollector(final RecordCollector recordCollector) {
+ this.recordCollector = recordCollector;
+ }
+
@Override
public void register(final StateStore store,
final StateRestoreCallback stateRestoreCallback) {
+ restoreCallbacks.put(store.name(), stateRestoreCallback);
addStateStore(store);
}
@@ -138,9 +152,14 @@ public class MockInternalNewProcessorContext<KOut, VOut>
extends MockProcessorCo
public void register(final StateStore store,
final StateRestoreCallback stateRestoreCallback,
final CommitCallback checkpoint) {
+ restoreCallbacks.put(store.name(), stateRestoreCallback);
addStateStore(store);
}
+ public StateRestoreCallback stateRestoreCallback(final String storeName) {
+ return restoreCallbacks.get(storeName);
+ }
+
@Override
public <K, V> void forward(K key, V value) {
throw new UnsupportedOperationException("Migrate to new
implementation");
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
deleted file mode 100644
index 4d4ad0e4dc0..00000000000
---
a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.test;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.CommitCallback;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.FixedKeyRecord;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.api.RecordMetadata;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorMetadata;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.query.Position;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-import java.io.File;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-
-@SuppressWarnings("deprecation")
-public class MockInternalProcessorContext extends
org.apache.kafka.streams.processor.MockProcessorContext implements
InternalProcessorContext<Object, Object> {
-
- private final Map<String, StateRestoreCallback> restoreCallbacks = new
LinkedHashMap<>();
- private ProcessorNode currentNode;
- private RecordCollector recordCollector;
- private long currentSystemTimeMs;
- private final TaskType taskType = TaskType.ACTIVE;
- private ProcessorMetadata processorMetadata;
-
- public MockInternalProcessorContext() {
- processorMetadata = new ProcessorMetadata();
- }
-
- public MockInternalProcessorContext(final Properties config, final TaskId
taskId, final File stateDir) {
- super(config, taskId, stateDir);
- processorMetadata = new ProcessorMetadata();
- }
-
- @Override
- public void setSystemTimeMs(long timeMs) {
- currentSystemTimeMs = timeMs;
- }
-
- @Override
- public long currentSystemTimeMs() {
- return currentSystemTimeMs;
- }
-
- @Override
- public StreamsMetricsImpl metrics() {
- return (StreamsMetricsImpl) super.metrics();
- }
-
- @Override
- public <K, V> void forward(final Record<K, V> record) {
- forward(record.key(), record.value(),
To.all().withTimestamp(record.timestamp()));
- }
-
- @Override
- public <K, V> void forward(final Record<K, V> record, final String
childName) {
- forward(record.key(), record.value(),
To.child(childName).withTimestamp(record.timestamp()));
- }
-
- @Override
- public ProcessorRecordContext recordContext() {
- return new ProcessorRecordContext(timestamp(), offset(), partition(),
topic(), headers());
- }
-
- @Override
- public Optional<RecordMetadata> recordMetadata() {
- return Optional.of(recordContext());
- }
-
- @Override
- public void setRecordContext(final ProcessorRecordContext recordContext) {
- setRecordMetadata(
- recordContext.topic(),
- recordContext.partition(),
- recordContext.offset(),
- recordContext.headers(),
- recordContext.timestamp()
- );
- }
-
- @Override
- public void setCurrentNode(final ProcessorNode currentNode) {
- this.currentNode = currentNode;
- }
-
- @Override
- public ProcessorNode currentNode() {
- return currentNode;
- }
-
- @Override
- public ThreadCache cache() {
- return null;
- }
-
- @Override
- public void initialize() {}
-
- @Override
- public void uninitialize() {}
-
- @Override
- public RecordCollector recordCollector() {
- return recordCollector;
- }
-
- public void setRecordCollector(final RecordCollector recordCollector) {
- this.recordCollector = recordCollector;
- }
-
- @Override
- public void register(final StateStore store,
- final StateRestoreCallback stateRestoreCallback) {
- restoreCallbacks.put(store.name(), stateRestoreCallback);
- super.register(store, stateRestoreCallback);
- }
-
- @Override
- public void register(final StateStore store,
- final StateRestoreCallback stateRestoreCallback,
- final CommitCallback checkpoint) {
- restoreCallbacks.put(store.name(), stateRestoreCallback);
- super.register(store, stateRestoreCallback);
- }
-
- public StateRestoreCallback stateRestoreCallback(final String storeName) {
- return restoreCallbacks.get(storeName);
- }
-
- @Override
- public TaskType taskType() {
- return taskType;
- }
-
- @Override
- public void logChange(final String storeName,
- final Bytes key,
- final byte[] value,
- final long timestamp,
- final Position position) {
- }
-
- @Override
- public void transitionToActive(final StreamTask streamTask, final
RecordCollector recordCollector, final ThreadCache newCache) {
- }
-
- @Override
- public void transitionToStandby(final ThreadCache newCache) {
- }
-
- @Override
- public void registerCacheFlushListener(final String namespace, final
DirtyEntryFlushListener listener) {
- }
-
- @Override
- public String changelogFor(final String storeName) {
- return "mock-changelog";
- }
-
- @Override
- public void addProcessorMetadataKeyValue(final String key, final long
value) {
- processorMetadata.put(key, value);
- }
-
- @Override
- public Long processorMetadataForKey(final String key) {
- return processorMetadata.get(key);
- }
-
- @Override
- public void setProcessorMetadata(final ProcessorMetadata metadata) {
- Objects.requireNonNull(metadata);
- processorMetadata = metadata;
- }
-
- @Override
- public ProcessorMetadata processorMetadata() {
- return processorMetadata;
- }
-
- @Override
- public <K, V> void forward(final FixedKeyRecord<K, V> record) {
- forward(new Record<>(record.key(), record.value(), record.timestamp(),
record.headers()));
- }
-
- @Override
- public <K, V> void forward(final FixedKeyRecord<K, V> record, final String
childName) {
- forward(
- new Record<>(record.key(), record.value(), record.timestamp(),
record.headers()),
- childName
- );
- }
-}
\ No newline at end of file