This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25457377e3 HOTFIX: fix broken trunk due to conflicting and overlapping
commits (#12074)
25457377e3 is described below
commit 25457377e3883390b313fb3e6e7f8d1308e5a5ea
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 20 14:39:15 2022 -0700
HOTFIX: fix broken trunk due to conflicting and overlapping commits (#12074)
Reviewers: Victoria Xia <[email protected]>, David Arthur
<[email protected]>
---
.../TimeOrderedCachingPersistentWindowStoreTest.java | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index dd8b54662d..697e3c6f51 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -32,11 +32,12 @@ import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
-import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -204,14 +205,13 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
builder.stream(TOPIC,
Consumed.with(Serdes.String(), Serdes.String()))
- .transform(() -> new Transformer<String, String, KeyValue<String,
String>>() {
+ .process(() -> new Processor<String, String, String, String>() {
private WindowStore<String, ValueAndTimestamp<String>> store;
private int numRecordsProcessed;
- private ProcessorContext context;
+ private
org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context;
- @SuppressWarnings("unchecked")
@Override
- public void init(final ProcessorContext processorContext) {
+ public void init(final
org.apache.kafka.streams.processor.api.ProcessorContext<String, String>
processorContext) {
this.context = processorContext;
this.store = processorContext.getStateStore("store-name");
int count = 0;
@@ -227,7 +227,7 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
}
@Override
- public KeyValue<String, String> transform(final String key,
final String value) {
+ public void process(final Record<String, String> record) {
int count = 0;
try (final KeyValueIterator<Windowed<String>,
ValueAndTimestamp<String>> all = store.all()) {
@@ -239,13 +239,14 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
assertThat(count, equalTo(numRecordsProcessed));
- store.put(value, ValueAndTimestamp.make(value,
context.timestamp()), context.timestamp());
+ store.put(record.value(),
ValueAndTimestamp.make(record.value(), record.timestamp()), record.timestamp());
numRecordsProcessed++;
- return new KeyValue<>(key, value);
+ context.forward(record);
}
+
@Override
public void close() {
}