This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 669a490  MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)
669a490 is described below

commit 669a49063db814b7d7181f0798bd9822bdf5a3d4
Author: dengziming <[email protected]>
AuthorDate: Thu Mar 31 10:34:39 2022 +0800

    MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)
    
    In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In 
the previous implementation, partitions were just ignored if there is no offset 
for them, however, we will print -1 instead now, This PR fix this inconsistency.
    
    Reviewers: David Jacot <[email protected]>, Luke Chen <[email protected]>
---
 core/src/main/scala/kafka/tools/GetOffsetShell.scala     | 8 ++++++--
 core/src/test/scala/kafka/tools/GetOffsetShellTest.scala | 7 +++++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala 
b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index d4e81bc..03f9c81 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -22,7 +22,7 @@ import joptsimple._
 import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, 
ListTopicsOptions, OffsetSpec}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.requests.ListOffsetsRequest
+import org.apache.kafka.common.requests.{ListOffsetsRequest, 
ListOffsetsResponse}
 import org.apache.kafka.common.utils.Utils
 
 import java.util.Properties
@@ -135,7 +135,11 @@ object GetOffsetShell {
       val partitionOffsets = partitionInfos.flatMap { tp =>
         try {
           val partitionInfo = listOffsetsResult.partitionResult(tp).get
-          Some((tp, partitionInfo.offset))
+          if (partitionInfo.offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
+            Some((tp, partitionInfo.offset))
+          } else {
+            None
+          }
         } catch {
           case e: ExecutionException =>
             e.getCause match {
diff --git a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala 
b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
index 02164266..cbce573 100644
--- a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
+++ b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
@@ -167,6 +167,13 @@ class GetOffsetShellTest extends KafkaServerTestHarness 
with Logging {
   }
 
   @Test
+  def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = {
+    val time = (System.currentTimeMillis() * 2).toString
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*", 
"--time", time))
+    assertEquals(List.empty, offsets)
+  }
+
+  @Test
   def testTopicPartitionsArgWithInternalExcluded(): Unit = {
     val offsets = executeAndParse(Array("--topic-partitions",
       "topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics"))

Reply via email to