This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5d9892413d KAFKA-19779: Add per-partition epoch validation to streams 
groups  [4/N] (#20760)
d5d9892413d is described below

commit d5d9892413ddae0b376f85d8ac179b8b4ca06ef6
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Nov 6 13:23:16 2025 +0100

    KAFKA-19779: Add per-partition epoch validation to streams groups  [4/N] 
(#20760)
    
    This change enhances the offset commit validation logic in streams
    groups to validate against per-partition assignment epochs. When a
    member attempts to commit offsets with an older member epoch, the logic
    now validates that the epoch is not older than the assignment epoch for
    each individual partition being committed.
    
    The implementation adds a new `createAssignmentEpochValidator` method
    that creates partition-level validators, checking each partition against
    its assignment epoch from either assigned tasks or tasks pending
    revocation.
    
    We extend the SmokeTestDriverIntegrationTest to detect if we have
    processed more records than needed, which, in this restricted scenario,
    should only happen when offset commits are failing.
    
    We re-enable the previously flaky test in EosIntegrationTest, which
    failed due to previously failing offset commits.
    
    Both tests have been run 100x in their streams protocol variation to
    validate that they are not flaky anymore.
    
    Reviewers: David Jacot <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 build.gradle                                       |   1 -
 .../coordinator/group/streams/StreamsGroup.java    |  64 +++++++++++-
 .../group/OffsetMetadataManagerTest.java           |  74 ++++++++++++++
 .../group/streams/StreamsGroupTest.java            | 107 ++++++++++++++++++++-
 .../streams/integration/EosIntegrationTest.java    |   2 -
 .../SmokeTestDriverIntegrationTest.java            |  19 +++-
 .../kafka/streams/tests/SmokeTestClient.java       |   8 +-
 .../apache/kafka/streams/tests/SmokeTestUtil.java  |   6 ++
 8 files changed, 272 insertions(+), 9 deletions(-)

diff --git a/build.gradle b/build.gradle
index 6b7fefbf333..a1768cfac66 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2900,7 +2900,6 @@ project(':streams:integration-tests') {
     testImplementation libs.mockitoCore
     testImplementation testLog4j2Libs
     testImplementation project(':streams:test-utils')
-    testImplementation project(':test-common:test-common-util')
 
     testRuntimeOnly runtimeTestLibs
   }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 30a5652a2ac..192ddada4de 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -32,6 +32,7 @@ import org.apache.kafka.coordinator.group.Group;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
 import org.apache.kafka.coordinator.group.Utils;
+import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -726,8 +727,17 @@ public class StreamsGroup implements Group {
                 "by members using the streams group protocol");
         }
 
-        validateMemberEpoch(memberEpoch, member.memberEpoch());
-        return CommitPartitionValidator.NO_OP;
+        if (memberEpoch == member.memberEpoch()) {
+            return CommitPartitionValidator.NO_OP;
+        }
+
+        if (memberEpoch > member.memberEpoch()) {
+            throw new StaleMemberEpochException(String.format("Received member 
epoch %d is newer than " +
+                "current member epoch %d.", memberEpoch, 
member.memberEpoch()));
+        }
+
+        // Member epoch is older; validate against per-partition assignment 
epochs.
+        return createAssignmentEpochValidator(member, memberEpoch);
     }
 
     /**
@@ -1120,4 +1130,54 @@ public class StreamsGroup implements Group {
             this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
         }
     }
+
+    /**
+     * Creates a validator that checks if the received member epoch is valid 
for each partition's assignment epoch.
+     *
+     * @param member The member whose assignments are being validated.
+     * @param receivedMemberEpoch The received member epoch.
+     * @return A validator for per-partition validation.
+     */
+    private CommitPartitionValidator createAssignmentEpochValidator(
+        final StreamsGroupMember member,
+        int receivedMemberEpoch
+    ) {
+        // Retrieve topology once for all partitions - not per partition!
+        final StreamsTopology streamsTopology = topology.get().orElseThrow(() 
->
+            new StaleMemberEpochException("Topology is not available for 
offset commit validation."));
+        
+        final TasksTupleWithEpochs assignedTasks = member.assignedTasks();
+        final TasksTupleWithEpochs tasksPendingRevocation = 
member.tasksPendingRevocation();
+
+        return (topicName, topicId, partitionId) -> {
+            final StreamsGroupTopologyValue.Subtopology subtopology = 
streamsTopology.sourceTopicMap().get(topicName);
+            if (subtopology == null) {
+                throw new StaleMemberEpochException("Topic " + topicName + " 
is not in the topology.");
+            }
+
+            final String subtopologyId = subtopology.subtopologyId();
+
+            // Search for the partition in assigned tasks, then in tasks 
pending revocation
+            Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs()
+                .getOrDefault(subtopologyId, Collections.emptyMap())
+                .get(partitionId);
+            if (assignmentEpoch == null) {
+                assignmentEpoch = 
tasksPendingRevocation.activeTasksWithEpochs()
+                    .getOrDefault(subtopologyId, Collections.emptyMap())
+                    .get(partitionId);
+            }
+
+            if (assignmentEpoch == null) {
+                throw new StaleMemberEpochException(String.format(
+                    "Task %s-%d is not assigned or pending revocation for 
member.",
+                    subtopologyId, partitionId));
+            }
+
+            if (receivedMemberEpoch < assignmentEpoch) {
+                throw new StaleMemberEpochException(String.format(
+                    "Received member epoch %d is older than assignment epoch 
%d for task %s-%d.",
+                    receivedMemberEpoch, assignmentEpoch, subtopologyId, 
partitionId));
+            }
+        };
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 672cc37c414..275a3eeccc4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -66,6 +66,7 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
 import org.apache.kafka.coordinator.group.streams.StreamsGroup;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
 import org.apache.kafka.coordinator.group.streams.StreamsTopology;
+import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -3659,4 +3660,77 @@ public class OffsetMetadataManagerTest {
             )
         );
     }
+
+    @Test
+    public void testStreamsGroupOffsetCommitWithAssignmentEpochValid() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        StreamsGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
+
+        // Setup: topology with topic "bar" in subtopology "0"
+        group.setTopology(new StreamsTopology(1, Map.of("0", new 
StreamsGroupTopologyValue.Subtopology()
+            .setSubtopologyId("0")
+            .setSourceTopics(List.of("bar")))));
+
+        // Member at epoch 10, with partitions assigned at epoch 4 and 5 
respsectively.
+        group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
+            .setMemberEpoch(10)
+            .setAssignedTasks(new TasksTupleWithEpochs(
+                Map.of("0", Map.of(0, 4, 1, 5)),
+                Map.of(), Map.of()))
+            .build());
+
+        // Commit with member epoch 5 should succeed (5 >= assignment epoch 5)
+        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result 
= context.commitOffset(
+            new OffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationIdOrMemberEpoch(5)
+                .setTopics(List.of(new 
OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setName("bar")
+                    .setPartitions(List.of(
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L),
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(1)
+                            .setCommittedOffset(200L))))));
+
+        assertEquals(Errors.NONE.code(), 
result.response().topics().get(0).partitions().get(0).errorCode());
+        assertEquals(Errors.NONE.code(), 
result.response().topics().get(0).partitions().get(1).errorCode());
+        assertEquals(2, result.records().size());
+    }
+
+    @Test
+    public void testStreamsGroupOffsetCommitWithAssignmentEpochStale() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+        StreamsGroup group = 
context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
+
+        group.setTopology(new StreamsTopology(1, Map.of("0", new 
StreamsGroupTopologyValue.Subtopology()
+            .setSubtopologyId("0")
+            .setSourceTopics(List.of("bar")))));
+
+        // Member at epoch 10, with partitions assigned at different epochs
+        group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
+            .setMemberEpoch(10)
+            .setAssignedTasks(new TasksTupleWithEpochs(
+                Map.of("0", Map.of(0, 5, 1, 8)),
+                Map.of(), Map.of()))
+            .build());
+
+        // Commit with member epoch 7 should fail (3 < assignment epochs 8)
+        assertThrows(StaleMemberEpochException.class, () -> 
context.commitOffset(
+            new OffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationIdOrMemberEpoch(3)
+                .setTopics(List.of(new 
OffsetCommitRequestData.OffsetCommitRequestTopic()
+                    .setName("bar")
+                    .setPartitions(List.of(
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(0)
+                            .setCommittedOffset(100L),
+                        new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                            .setPartitionIndex(1)
+                            .setCommittedOffset(200L)))))));
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
index 6842c2aafa3..de1bf2d82c8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java
@@ -31,6 +31,7 @@ import 
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
+import org.apache.kafka.coordinator.group.CommitPartitionValidator;
 import org.apache.kafka.coordinator.group.OffsetAndMetadata;
 import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
 import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
@@ -660,7 +661,7 @@ public class StreamsGroupTest {
         assertThrows(UnknownMemberIdException.class, () ->
             group.validateOffsetCommit("", null, -1, isTransactional, 
version));
 
-        // The member epoch is stale.
+        // The member epoch is stale (newer than current).
         if (version >= 9) {
             assertThrows(StaleMemberEpochException.class, () ->
                 group.validateOffsetCommit("new-protocol-member-id", "", 10, 
isTransactional, version));
@@ -669,7 +670,7 @@ public class StreamsGroupTest {
                 group.validateOffsetCommit("new-protocol-member-id", "", 10, 
isTransactional, version));
         }
 
-        // This should succeed.
+        // This should succeed (matching member epoch).
         if (version >= 9) {
             group.validateOffsetCommit("new-protocol-member-id", "", 0, 
isTransactional, version);
         } else {
@@ -678,6 +679,108 @@ public class StreamsGroupTest {
         }
     }
 
+    @Test
+    public void testValidateOffsetCommitWithOlderEpoch() {
+        StreamsGroup group = createStreamsGroup("group-foo");
+        
+        group.setTopology(new StreamsTopology(1, Map.of("0", new 
StreamsGroupTopologyValue.Subtopology()
+            .setSubtopologyId("0")
+            .setSourceTopics(List.of("input-topic")))));
+        
+        group.updateMember(new StreamsGroupMember.Builder("member-1")
+            .setMemberEpoch(2)
+            .setAssignedTasks(new TasksTupleWithEpochs(
+                Map.of("0", Map.of(0, 2, 1, 1)),
+                Map.of(), Map.of()))
+            .build());
+        
+        CommitPartitionValidator validator = group.validateOffsetCommit(
+            "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+        
+        // Received epoch (1) < assignment epoch (2) should throw
+        assertThrows(StaleMemberEpochException.class, () ->
+            validator.validate("input-topic", Uuid.ZERO_UUID, 0));
+    }
+
+    @Test
+    public void testValidateOffsetCommitWithOlderEpochMissingTopology() {
+        StreamsGroup group = createStreamsGroup("group-foo");
+        
+        group.updateMember(new StreamsGroupMember.Builder("member-1")
+            .setMemberEpoch(2)
+            .build());
+        
+        // Topology is retrieved when creating validator, so exception is 
thrown here
+        assertThrows(StaleMemberEpochException.class, () ->
+            group.validateOffsetCommit("member-1", "", 1, false, 
ApiKeys.OFFSET_COMMIT.latestVersion()));
+    }
+
+    @Test
+    public void testValidateOffsetCommitWithOlderEpochMissingSubtopology() {
+        StreamsGroup group = createStreamsGroup("group-foo");
+        
+        group.setTopology(new StreamsTopology(1, Map.of("0", new 
StreamsGroupTopologyValue.Subtopology()
+            .setSubtopologyId("0")
+            .setSourceTopics(List.of("input-topic")))));
+        
+        group.updateMember(new StreamsGroupMember.Builder("member-1")
+            .setMemberEpoch(2)
+            .build());
+        
+        CommitPartitionValidator validator = group.validateOffsetCommit(
+            "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+        
+        assertThrows(StaleMemberEpochException.class, () ->
+            validator.validate("unknown-topic", Uuid.ZERO_UUID, 0));
+    }
+
+    @Test
+    public void testValidateOffsetCommitWithOlderEpochUnassignedPartition() {
+        StreamsGroup group = createStreamsGroup("group-foo");
+        
+        group.setTopology(new StreamsTopology(1, Map.of("0", new 
StreamsGroupTopologyValue.Subtopology()
+            .setSubtopologyId("0")
+            .setSourceTopics(List.of("input-topic")))));
+        
+        group.updateMember(new StreamsGroupMember.Builder("member-1")
+            .setMemberEpoch(2)
+            .setAssignedTasks(new TasksTupleWithEpochs(
+                Map.of("0", Map.of(0, 1)),
+                Map.of(), Map.of()))
+            .setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
+            .build());
+        
+        CommitPartitionValidator validator = group.validateOffsetCommit(
+            "member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+
+        // Partition 1 not assigned should throw
+        assertThrows(StaleMemberEpochException.class, () ->
+            validator.validate("input-topic", Uuid.ZERO_UUID, 1));
+    }
+
+    @Test
+    public void testValidateOffsetCommitWithOlderEpochValidAssignment() {
+        StreamsGroup group = createStreamsGroup("group-foo");
+        
+        group.setTopology(new StreamsTopology(1, Map.of("0", new 
StreamsGroupTopologyValue.Subtopology()
+            .setSubtopologyId("0")
+            .setSourceTopics(List.of("input-topic")))));
+        
+        group.updateMember(new StreamsGroupMember.Builder("member-1")
+            .setMemberEpoch(5)
+            .setAssignedTasks(new TasksTupleWithEpochs(
+                Map.of("0", Map.of(0, 2, 1, 2)),
+                Map.of(), Map.of()))
+            .build());
+        
+        CommitPartitionValidator validator = group.validateOffsetCommit(
+            "member-1", "", 2, false, ApiKeys.OFFSET_COMMIT.latestVersion());
+        
+        // Received epoch 2 == assignment epoch 2 should succeed
+        validator.validate("input-topic", Uuid.ZERO_UUID, 0);
+        validator.validate("input-topic", Uuid.ZERO_UUID, 1);
+    }
+
     @Test
     public void testAsListedGroup() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 194025dd4d0..46ace65cf04 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -35,7 +35,6 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.test.api.Flaky;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -396,7 +395,6 @@ public class EosIntegrationTest {
         }
     }
 
-    @Flaky("KAFKA-19816")
     @ParameterizedTest
     @MethodSource("groupProtocolAndProcessingThreadsParameters")
     public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, 
final boolean processingThreadsEnabled) throws Exception {
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 4e59e9523c4..3a6b15e29b1 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -47,6 +47,7 @@ import java.util.Set;
 import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
 import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -140,11 +141,14 @@ public class SmokeTestDriverIntegrationTest {
             throw new AssertionError("Test called halt(). code:" + statusCode 
+ " message:" + message);
         });
         int numClientsCreated = 0;
+        int numDataRecordsProcessed = 0;
+        final int numKeys = 10;
+        final int maxRecordsPerKey = 1000;
 
         IntegrationTestUtils.cleanStateBeforeTest(cluster, 
SmokeTestDriver.topics());
 
         final String bootstrapServers = cluster.bootstrapServers();
-        final Driver driver = new Driver(bootstrapServers, 10, 1000);
+        final Driver driver = new Driver(bootstrapServers, numKeys, 
maxRecordsPerKey);
         driver.start();
         System.out.println("started driver");
 
@@ -183,6 +187,7 @@ public class SmokeTestDriverIntegrationTest {
                     assertFalse(client.error(), "The streams application seems 
to have crashed.");
                     Thread.sleep(100);
                 }
+                numDataRecordsProcessed += client.totalDataRecordsProcessed();
             }
         }
 
@@ -201,6 +206,7 @@ public class SmokeTestDriverIntegrationTest {
                     assertFalse(client.error(), "The streams application seems 
to have crashed.");
                     Thread.sleep(100);
                 }
+                numDataRecordsProcessed += client.totalDataRecordsProcessed();
             }
         }
 
@@ -210,5 +216,16 @@ public class SmokeTestDriverIntegrationTest {
             throw new AssertionError(driver.exception());
         }
         assertTrue(driver.result().passed(), driver.result().result());
+
+        // The one extra record is a record that the driver produces to flush 
suppress
+        final int expectedRecords = numKeys * maxRecordsPerKey + 1;
+
+        // We check that we did no have to reprocess any records, which would 
indicate a bug since everything
+        // runs locally in this test.
+        assertEquals(expectedRecords, numDataRecordsProcessed,
+            String.format("It seems we had to reprocess records, expected %d 
records, processed %d records.",
+                expectedRecords,
+                numDataRecordsProcessed)
+        );
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 7f8057c5597..b0012fa61b4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -44,6 +44,7 @@ import java.time.Instant;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
@@ -55,6 +56,7 @@ public class SmokeTestClient extends SmokeTestUtil {
     private boolean uncaughtException = false;
     private volatile boolean closed;
     private volatile boolean error;
+    private final AtomicInteger totalDataRecordsProcessed = new 
AtomicInteger(0);
 
     private static void addShutdownHook(final String name, final Runnable 
runnable) {
         if (name != null) {
@@ -76,6 +78,10 @@ public class SmokeTestClient extends SmokeTestUtil {
         return error;
     }
 
+    public int totalDataRecordsProcessed() {
+        return totalDataRecordsProcessed.get();
+    }
+
     public void start(final Properties streamsProperties) {
         final Topology build = getTopology();
         streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
@@ -156,7 +162,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         source.filterNot((k, v) -> k.equals("flush"))
               .to("echo", Produced.with(stringSerde, intSerde));
         final KStream<String, Integer> data = source.filter((key, value) -> 
value == null || value != END);
-        data.process(SmokeTestUtil.printProcessorSupplier("data", name));
+        data.process(SmokeTestUtil.printProcessorSupplier("data", name, 
totalDataRecordsProcessed));
 
         // min
         final KGroupedStream<String, Integer> groupedData = 
data.groupByKey(Grouped.with(stringSerde, intSerde));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
index 7e670802b93..d0ad6c8cabb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
 
 import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class SmokeTestUtil {
 
@@ -39,6 +40,10 @@ public class SmokeTestUtil {
     }
 
     static ProcessorSupplier<Object, Object, Void, Void> 
printProcessorSupplier(final String topic, final String name) {
+        return printProcessorSupplier(topic, name, new AtomicInteger());
+    }
+
+    static ProcessorSupplier<Object, Object, Void, Void> 
printProcessorSupplier(final String topic, final String name, final 
AtomicInteger totalRecordsProcessed) {
         return () -> new ContextualProcessor<>() {
             private int numRecordsProcessed = 0;
             private long smallestOffset = Long.MAX_VALUE;
@@ -84,6 +89,7 @@ public class SmokeTestUtil {
                 }
                 System.out.println("offset " + smallestOffset + " to " + 
largestOffset + " -> processed " + processed);
                 System.out.flush();
+                totalRecordsProcessed.addAndGet(numRecordsProcessed);
             }
         };
     }

Reply via email to