This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5d9892413d KAFKA-19779: Add per-partition epoch validation to streams
groups [4/N] (#20760)
d5d9892413d is described below
commit d5d9892413ddae0b376f85d8ac179b8b4ca06ef6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Nov 6 13:23:16 2025 +0100
KAFKA-19779: Add per-partition epoch validation to streams groups [4/N]
(#20760)
This change enhances the offset commit validation logic in streams
groups to validate against per-partition assignment epochs. When a
member attempts to commit offsets with an older member epoch, the logic
now validates that the epoch is not older than the assignment epoch for
each individual partition being committed.
The implementation adds a new `createAssignmentEpochValidator` method
that creates partition-level validators, checking each partition against
its assignment epoch from either assigned tasks or tasks pending
revocation.
We extend the SmokeTestDriverIntegrationTest to detect if we have
processed more records than needed, which, in this restricted scenario,
should only happen when offset commits are failing.
We re-enable the previously flaky test in EosIntegrationTest, which
failed due to previously failing offset commits.
Both tests have been run 100x in their streams protocol variation to
validate that they are not flaky anymore.
Reviewers: David Jacot <[email protected]>, Matthias J. Sax
<[email protected]>
---
build.gradle | 1 -
.../coordinator/group/streams/StreamsGroup.java | 64 +++++++++++-
.../group/OffsetMetadataManagerTest.java | 74 ++++++++++++++
.../group/streams/StreamsGroupTest.java | 107 ++++++++++++++++++++-
.../streams/integration/EosIntegrationTest.java | 2 -
.../SmokeTestDriverIntegrationTest.java | 19 +++-
.../kafka/streams/tests/SmokeTestClient.java | 8 +-
.../apache/kafka/streams/tests/SmokeTestUtil.java | 6 ++
8 files changed, 272 insertions(+), 9 deletions(-)
diff --git a/build.gradle b/build.gradle
index 6b7fefbf333..a1768cfac66 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2900,7 +2900,6 @@ project(':streams:integration-tests') {
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs
testImplementation project(':streams:test-utils')
- testImplementation project(':test-common:test-common-util')
testRuntimeOnly runtimeTestLibs
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 30a5652a2ac..192ddada4de 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -32,6 +32,7 @@ import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@@ -726,8 +727,17 @@ public class StreamsGroup implements Group {
"by members using the streams group protocol");
}
- validateMemberEpoch(memberEpoch, member.memberEpoch());
- return CommitPartitionValidator.NO_OP;
+ if (memberEpoch == member.memberEpoch()) {
+ return CommitPartitionValidator.NO_OP;
+ }
+
+ if (memberEpoch > member.memberEpoch()) {
+ throw new StaleMemberEpochException(String.format("Received member
epoch %d is newer than " +
+ "current member epoch %d.", memberEpoch,
member.memberEpoch()));
+ }
+
+ // Member epoch is older; validate against per-partition assignment
epochs.
+ return createAssignmentEpochValidator(member, memberEpoch);
}
/**
@@ -1120,4 +1130,54 @@ public class StreamsGroup implements Group {
this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
}
}
+
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ *
+ * @param member The member whose assignments are being validated.
+ * @param receivedMemberEpoch The received member epoch.
+ * @return A validator for per-partition validation.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ final StreamsGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ // Retrieve topology once for all partitions - not per partition!
+ final StreamsTopology streamsTopology = topology.get().orElseThrow(()
->
+ new StaleMemberEpochException("Topology is not available for
offset commit validation."));
+
+ final TasksTupleWithEpochs assignedTasks = member.assignedTasks();
+ final TasksTupleWithEpochs tasksPendingRevocation =
member.tasksPendingRevocation();
+
+ return (topicName, topicId, partitionId) -> {
+ final StreamsGroupTopologyValue.Subtopology subtopology =
streamsTopology.sourceTopicMap().get(topicName);
+ if (subtopology == null) {
+ throw new StaleMemberEpochException("Topic " + topicName + "
is not in the topology.");
+ }
+
+ final String subtopologyId = subtopology.subtopologyId();
+
+ // Search for the partition in assigned tasks, then in tasks
pending revocation
+ Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs()
+ .getOrDefault(subtopologyId, Collections.emptyMap())
+ .get(partitionId);
+ if (assignmentEpoch == null) {
+ assignmentEpoch =
tasksPendingRevocation.activeTasksWithEpochs()
+ .getOrDefault(subtopologyId, Collections.emptyMap())
+ .get(partitionId);
+ }
+
+ if (assignmentEpoch == null) {
+ throw new StaleMemberEpochException(String.format(
+ "Task %s-%d is not assigned or pending revocation for
member.",
+ subtopologyId, partitionId));
+ }
+
+ if (receivedMemberEpoch < assignmentEpoch) {
+ throw new StaleMemberEpochException(String.format(
+ "Received member epoch %d is older than assignment epoch
%d for task %s-%d.",
+ receivedMemberEpoch, assignmentEpoch, subtopologyId,
partitionId));
+ }
+ };
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 672cc37c414..275a3eeccc4 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -66,6 +66,7 @@ import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
+import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -3659,4 +3660,77 @@ public class OffsetMetadataManagerTest {
)
);
}
+
+ @Test
+ public void testStreamsGroupOffsetCommitWithAssignmentEpochValid() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ StreamsGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
+
+ // Setup: topology with topic "bar" in subtopology "0"
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("bar")))));
+
+ // Member at epoch 10, with partitions assigned at epoch 4 and 5
respsectively.
+ group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
+ .setMemberEpoch(10)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 4, 1, 5)),
+ Map.of(), Map.of()))
+ .build());
+
+ // Commit with member epoch 5 should succeed (5 >= assignment epoch 5)
+ CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result
= context.commitOffset(
+ new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(5)
+ .setTopics(List.of(new
OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200L))))));
+
+ assertEquals(Errors.NONE.code(),
result.response().topics().get(0).partitions().get(0).errorCode());
+ assertEquals(Errors.NONE.code(),
result.response().topics().get(0).partitions().get(1).errorCode());
+ assertEquals(2, result.records().size());
+ }
+
+ @Test
+ public void testStreamsGroupOffsetCommitWithAssignmentEpochStale() {
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder().build();
+ StreamsGroup group =
context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("bar")))));
+
+ // Member at epoch 10, with partitions assigned at different epochs
+ group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
+ .setMemberEpoch(10)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 5, 1, 8)),
+ Map.of(), Map.of()))
+ .build());
+
+ // Commit with member epoch 7 should fail (3 < assignment epochs 8)
+ assertThrows(StaleMemberEpochException.class, () ->
context.commitOffset(
+ new OffsetCommitRequestData()
+ .setGroupId("foo")
+ .setMemberId("member")
+ .setGenerationIdOrMemberEpoch(3)
+ .setTopics(List.of(new
OffsetCommitRequestData.OffsetCommitRequestTopic()
+ .setName("bar")
+ .setPartitions(List.of(
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100L),
+ new
OffsetCommitRequestData.OffsetCommitRequestPartition()
+ .setPartitionIndex(1)
+ .setCommittedOffset(200L)))))));
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 6842c2aafa3..de1bf2d82c8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -31,6 +31,7 @@ import
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.coordinator.group.CommitPartitionValidator;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
@@ -660,7 +661,7 @@ public class StreamsGroupTest {
assertThrows(UnknownMemberIdException.class, () ->
group.validateOffsetCommit("", null, -1, isTransactional,
version));
- // The member epoch is stale.
+ // The member epoch is stale (newer than current).
if (version >= 9) {
assertThrows(StaleMemberEpochException.class, () ->
group.validateOffsetCommit("new-protocol-member-id", "", 10,
isTransactional, version));
@@ -669,7 +670,7 @@ public class StreamsGroupTest {
group.validateOffsetCommit("new-protocol-member-id", "", 10,
isTransactional, version));
}
- // This should succeed.
+ // This should succeed (matching member epoch).
if (version >= 9) {
group.validateOffsetCommit("new-protocol-member-id", "", 0,
isTransactional, version);
} else {
@@ -678,6 +679,108 @@ public class StreamsGroupTest {
}
}
+ @Test
+ public void testValidateOffsetCommitWithOlderEpoch() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 2, 1, 1)),
+ Map.of(), Map.of()))
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ // Received epoch (1) < assignment epoch (2) should throw
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("input-topic", Uuid.ZERO_UUID, 0));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochMissingTopology() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .build());
+
+ // Topology is retrieved when creating validator, so exception is
thrown here
+ assertThrows(StaleMemberEpochException.class, () ->
+ group.validateOffsetCommit("member-1", "", 1, false,
ApiKeys.OFFSET_COMMIT.latestVersion()));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochMissingSubtopology() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("unknown-topic", Uuid.ZERO_UUID, 0));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochUnassignedPartition() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(2)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 1)),
+ Map.of(), Map.of()))
+ .setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ // Partition 1 not assigned should throw
+ assertThrows(StaleMemberEpochException.class, () ->
+ validator.validate("input-topic", Uuid.ZERO_UUID, 1));
+ }
+
+ @Test
+ public void testValidateOffsetCommitWithOlderEpochValidAssignment() {
+ StreamsGroup group = createStreamsGroup("group-foo");
+
+ group.setTopology(new StreamsTopology(1, Map.of("0", new
StreamsGroupTopologyValue.Subtopology()
+ .setSubtopologyId("0")
+ .setSourceTopics(List.of("input-topic")))));
+
+ group.updateMember(new StreamsGroupMember.Builder("member-1")
+ .setMemberEpoch(5)
+ .setAssignedTasks(new TasksTupleWithEpochs(
+ Map.of("0", Map.of(0, 2, 1, 2)),
+ Map.of(), Map.of()))
+ .build());
+
+ CommitPartitionValidator validator = group.validateOffsetCommit(
+ "member-1", "", 2, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+ // Received epoch 2 == assignment epoch 2 should succeed
+ validator.validate("input-topic", Uuid.ZERO_UUID, 0);
+ validator.validate("input-topic", Uuid.ZERO_UUID, 1);
+ }
+
@Test
public void testAsListedGroup() {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 194025dd4d0..46ace65cf04 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -35,7 +35,6 @@ import
org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -396,7 +395,6 @@ public class EosIntegrationTest {
}
}
- @Flaky("KAFKA-19816")
@ParameterizedTest
@MethodSource("groupProtocolAndProcessingThreadsParameters")
public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol,
final boolean processingThreadsEnabled) throws Exception {
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 4e59e9523c4..3a6b15e29b1 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -47,6 +47,7 @@ import java.util.Set;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -140,11 +141,14 @@ public class SmokeTestDriverIntegrationTest {
throw new AssertionError("Test called halt(). code:" + statusCode
+ " message:" + message);
});
int numClientsCreated = 0;
+ int numDataRecordsProcessed = 0;
+ final int numKeys = 10;
+ final int maxRecordsPerKey = 1000;
IntegrationTestUtils.cleanStateBeforeTest(cluster,
SmokeTestDriver.topics());
final String bootstrapServers = cluster.bootstrapServers();
- final Driver driver = new Driver(bootstrapServers, 10, 1000);
+ final Driver driver = new Driver(bootstrapServers, numKeys,
maxRecordsPerKey);
driver.start();
System.out.println("started driver");
@@ -183,6 +187,7 @@ public class SmokeTestDriverIntegrationTest {
assertFalse(client.error(), "The streams application seems
to have crashed.");
Thread.sleep(100);
}
+ numDataRecordsProcessed += client.totalDataRecordsProcessed();
}
}
@@ -201,6 +206,7 @@ public class SmokeTestDriverIntegrationTest {
assertFalse(client.error(), "The streams application seems
to have crashed.");
Thread.sleep(100);
}
+ numDataRecordsProcessed += client.totalDataRecordsProcessed();
}
}
@@ -210,5 +216,16 @@ public class SmokeTestDriverIntegrationTest {
throw new AssertionError(driver.exception());
}
assertTrue(driver.result().passed(), driver.result().result());
+
+ // The one extra record is a record that the driver produces to flush
suppress
+ final int expectedRecords = numKeys * maxRecordsPerKey + 1;
+
+ // We check that we did no have to reprocess any records, which would
indicate a bug since everything
+ // runs locally in this test.
+ assertEquals(expectedRecords, numDataRecordsProcessed,
+ String.format("It seems we had to reprocess records, expected %d
records, processed %d records.",
+ expectedRecords,
+ numDataRecordsProcessed)
+ );
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 7f8057c5597..b0012fa61b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -44,6 +44,7 @@ import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
@@ -55,6 +56,7 @@ public class SmokeTestClient extends SmokeTestUtil {
private boolean uncaughtException = false;
private volatile boolean closed;
private volatile boolean error;
+ private final AtomicInteger totalDataRecordsProcessed = new
AtomicInteger(0);
private static void addShutdownHook(final String name, final Runnable
runnable) {
if (name != null) {
@@ -76,6 +78,10 @@ public class SmokeTestClient extends SmokeTestUtil {
return error;
}
+ public int totalDataRecordsProcessed() {
+ return totalDataRecordsProcessed.get();
+ }
+
public void start(final Properties streamsProperties) {
final Topology build = getTopology();
streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
@@ -156,7 +162,7 @@ public class SmokeTestClient extends SmokeTestUtil {
source.filterNot((k, v) -> k.equals("flush"))
.to("echo", Produced.with(stringSerde, intSerde));
final KStream<String, Integer> data = source.filter((key, value) ->
value == null || value != END);
- data.process(SmokeTestUtil.printProcessorSupplier("data", name));
+ data.process(SmokeTestUtil.printProcessorSupplier("data", name,
totalDataRecordsProcessed));
// min
final KGroupedStream<String, Integer> groupedData =
data.groupByKey(Grouped.with(stringSerde, intSerde));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 7e670802b93..d0ad6c8cabb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -29,6 +29,7 @@ import
org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
public class SmokeTestUtil {
@@ -39,6 +40,10 @@ public class SmokeTestUtil {
}
static ProcessorSupplier<Object, Object, Void, Void>
printProcessorSupplier(final String topic, final String name) {
+ return printProcessorSupplier(topic, name, new AtomicInteger());
+ }
+
+ static ProcessorSupplier<Object, Object, Void, Void>
printProcessorSupplier(final String topic, final String name, final
AtomicInteger totalRecordsProcessed) {
return () -> new ContextualProcessor<>() {
private int numRecordsProcessed = 0;
private long smallestOffset = Long.MAX_VALUE;
@@ -84,6 +89,7 @@ public class SmokeTestUtil {
}
System.out.println("offset " + smallestOffset + " to " +
largestOffset + " -> processed " + processed);
System.out.flush();
+ totalRecordsProcessed.addAndGet(numRecordsProcessed);
}
};
}