guozhangwang commented on code in PR #12337:
URL: https://github.com/apache/kafka/pull/12337#discussion_r905557876


##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java:
##########
@@ -34,7 +34,7 @@ public class ListConsumerGroupOffsetsResult {
 
     final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
 
-    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
+    public ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {

Review Comment:
   We need to push this public in order to be used in admin client callers.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -226,23 +226,25 @@ public void shouldAlwaysCheckpointStateIfEnforced() {
 
     @Test
     public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
         stateManager.flush();
-        EasyMock.expectLastCall();
+        EasyMock.expectLastCall().once();
         stateManager.checkpoint();
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
                 .andReturn(Collections.singletonMap(partition, 50L))
                 .andReturn(Collections.singletonMap(partition, 11000L))
-                .andReturn(Collections.singletonMap(partition, 11000L));
+                .andReturn(Collections.singletonMap(partition, 12000L));

Review Comment:
   These are from follow-ups of #12279 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1961,6 +1961,31 @@ public void shouldCheckpointState() {
         EasyMock.verify(stateManager);
     }
 
+    @Test
+    public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {

Review Comment:
   These are from follow-ups of #12279 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -300,7 +299,7 @@ private void maybeCheckpointUpdatingTasks(final long now) {
             final long elapsedMsSinceLastCommit = now - lastCommitMs;
             if (elapsedMsSinceLastCommit > commitIntervalMs) {
                 if (log.isDebugEnabled()) {
-                    log.debug("Committing all restoring tasks since {}ms has 
elapsed (commit interval is {}ms)",
+                    log.debug("Checkpointing all restoring tasks since {}ms 
has elapsed (commit interval is {}ms)",

Review Comment:
   These are from follow-ups of #12279 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to