[ 
https://issues.apache.org/jira/browse/KAFKA-5919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500879#comment-16500879
 ] 

ASF GitHub Bot commented on KAFKA-5919:
---------------------------------------

lindong28 closed pull request #5126: KAFKA-5919: Adding checks on "version" 
field for tools using it
URL: https://github.com/apache/kafka/pull/5126
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala 
b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
index 2715490ec23..9d426b1a0ae 100644
--- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
+++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala
@@ -22,31 +22,51 @@ import java.util.Properties
 
 import kafka.admin.AdminClient.DeleteRecordsResult
 import kafka.common.AdminCommandFailedException
-import kafka.utils.{CoreUtils, Json, CommandLineUtils}
+import kafka.utils.{CommandLineUtils, CoreUtils, Json}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.clients.CommonClientConfigs
 import joptsimple._
+import kafka.utils.json.JsonValue
 
 /**
  * A command for delete records of the given partitions down to the specified 
offset.
  */
 object DeleteRecordsCommand {
 
+  private[admin] val EarliestVersion = 1
+
   def main(args: Array[String]): Unit = {
     execute(args, System.out)
   }
 
   def parseOffsetJsonStringWithoutDedup(jsonData: String): 
Seq[(TopicPartition, Long)] = {
-    Json.parseFull(jsonData).toSeq.flatMap { js =>
-      js.asJsonObject.get("partitions").toSeq.flatMap { partitionsJs =>
-        partitionsJs.asJsonArray.iterator.map(_.asJsonObject).map { 
partitionJs =>
-          val topic = partitionJs("topic").to[String]
-          val partition = partitionJs("partition").to[Int]
-          val offset = partitionJs("offset").to[Long]
-          new TopicPartition(topic, partition) -> offset
-        }.toBuffer
-      }
+    Json.parseFull(jsonData) match {
+      case Some(js) =>
+        val version = js.asJsonObject.get("version") match {
+          case Some(jsonValue) => jsonValue.to[Int]
+          case None => EarliestVersion
+        }
+        parseJsonData(version, js)
+      case None => throw new AdminOperationException("The input string is not 
a valid JSON")
+    }
+  }
+
+  def parseJsonData(version: Int, js: JsonValue): Seq[(TopicPartition, Long)] 
= {
+    version match {
+      case 1 =>
+        js.asJsonObject.get("partitions") match {
+          case Some(partitions) => {
+            partitions.asJsonArray.iterator.map(_.asJsonObject).map { 
partitionJs =>
+              val topic = partitionJs("topic").to[String]
+              val partition = partitionJs("partition").to[Int]
+              val offset = partitionJs("offset").to[Long]
+              new TopicPartition(topic, partition) -> offset
+            }.toBuffer
+          }
+          case _ => throw new AdminOperationException("Missing partitions 
field");
+        }
+      case _ => throw new AdminOperationException(s"Not supported version 
field value $version")
     }
   }
 
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index f765b94a889..4d9da90bc69 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -25,6 +25,7 @@ import kafka.log.LogConfig
 import kafka.log.LogConfig._
 import kafka.server.{ConfigType, DynamicConfig}
 import kafka.utils._
+import kafka.utils.json.JsonValue
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo
 import org.apache.kafka.clients.admin.{AdminClientConfig, 
AlterReplicaLogDirsOptions, AdminClient => JAdminClient}
@@ -45,6 +46,8 @@ object ReassignPartitionsCommand extends Logging {
   private[admin] val NoThrottle = Throttle(-1, -1)
   private[admin] val AnyLogDir = "any"
 
+  private[admin] val EarliestVersion = 1
+
   def main(args: Array[String]): Unit = {
     val opts = validateAndParseArgs(args)
     val zkConnect = opts.options.valueOf(opts.zkConnectOpt)
@@ -168,7 +171,7 @@ object ReassignPartitionsCommand extends Logging {
   }
 
   def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: 
Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): 
(Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) = {
-    val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+    val topicsToReassign = parseTopicsData(topicsToMoveJsonString)
     val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign)
     if (duplicateTopicsToReassign.nonEmpty)
       throw new AdminCommandFailedException("List of topics to reassign 
contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(",")))
@@ -241,32 +244,70 @@ object ReassignPartitionsCommand extends Logging {
     ).asJava)
   }
 
-  // Parses without deduplicating keys so the data can be checked before 
allowing reassignment to proceed
+  def parseTopicsData(jsonData: String): Seq[String] = {
+    Json.parseFull(jsonData) match {
+      case Some(js) =>
+        val version = js.asJsonObject.get("version") match {
+          case Some(jsonValue) => jsonValue.to[Int]
+          case None => EarliestVersion
+        }
+        parseTopicsData(version, js)
+      case None => throw new AdminOperationException("The input string is not 
a valid JSON")
+    }
+  }
+
+  def parseTopicsData(version: Int, js: JsonValue): Seq[String] = {
+    version match {
+      case 1 =>
+        for {
+          partitionsSeq <- js.asJsonObject.get("topics").toSeq
+          p <- partitionsSeq.asJsonArray.iterator
+        } yield p.asJsonObject("topic").to[String]
+      case _ => throw new AdminOperationException(s"Not supported version 
field value $version")
+    }
+  }
+
   def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicPartition, 
Seq[Int])], Map[TopicPartitionReplica, String]) = {
-    val partitionAssignment = mutable.ListBuffer.empty[(TopicPartition, 
Seq[Int])]
-    val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String]
-    for {
-      js <- Json.parseFull(jsonData).toSeq
-      partitionsSeq <- js.asJsonObject.get("partitions").toSeq
-      p <- partitionsSeq.asJsonArray.iterator
-    } {
-      val partitionFields = p.asJsonObject
-      val topic = partitionFields("topic").to[String]
-      val partition = partitionFields("partition").to[Int]
-      val newReplicas = partitionFields("replicas").to[Seq[Int]]
-      val newLogDirs = partitionFields.get("log_dirs") match {
-        case Some(jsonValue) => jsonValue.to[Seq[String]]
-        case None => newReplicas.map(_ => AnyLogDir)
-      }
-      if (newReplicas.size != newLogDirs.size)
-        throw new AdminCommandFailedException(s"Size of replicas list 
$newReplicas is different from " +
-          s"size of log dirs list $newLogDirs for partition ${new 
TopicPartition(topic, partition)}")
-      partitionAssignment += (new TopicPartition(topic, partition) -> 
newReplicas)
-      replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, 
logDir) =>
-        new TopicPartitionReplica(topic, partition, replica) -> logDir
-      }.filter(_._2 != AnyLogDir)
+    Json.parseFull(jsonData) match {
+      case Some(js) =>
+        val version = js.asJsonObject.get("version") match {
+          case Some(jsonValue) => jsonValue.to[Int]
+          case None => EarliestVersion
+        }
+        parsePartitionReassignmentData(version, js)
+      case None => throw new AdminOperationException("The input string is not 
a valid JSON")
+    }
+  }
+
+  // Parses without deduplicating keys so the data can be checked before 
allowing reassignment to proceed
+  def parsePartitionReassignmentData(version:Int, jsonData: JsonValue): 
(Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = {
+    version match {
+      case 1 =>
+        val partitionAssignment = mutable.ListBuffer.empty[(TopicPartition, 
Seq[Int])]
+        val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, 
String]
+        for {
+          partitionsSeq <- jsonData.asJsonObject.get("partitions").toSeq
+          p <- partitionsSeq.asJsonArray.iterator
+        } {
+          val partitionFields = p.asJsonObject
+          val topic = partitionFields("topic").to[String]
+          val partition = partitionFields("partition").to[Int]
+          val newReplicas = partitionFields("replicas").to[Seq[Int]]
+          val newLogDirs = partitionFields.get("log_dirs") match {
+            case Some(jsonValue) => jsonValue.to[Seq[String]]
+            case None => newReplicas.map(_ => AnyLogDir)
+          }
+          if (newReplicas.size != newLogDirs.size)
+            throw new AdminCommandFailedException(s"Size of replicas list 
$newReplicas is different from " +
+              s"size of log dirs list $newLogDirs for partition ${new 
TopicPartition(topic, partition)}")
+          partitionAssignment += (new TopicPartition(topic, partition) -> 
newReplicas)
+          replicaAssignment ++= newReplicas.zip(newLogDirs).map { case 
(replica, logDir) =>
+            new TopicPartitionReplica(topic, partition, replica) -> logDir
+          }.filter(_._2 != AnyLogDir)
+        }
+        (partitionAssignment, replicaAssignment)
+      case _ => throw new AdminOperationException(s"Not supported version 
field value $version")
     }
-    (partitionAssignment, replicaAssignment)
   }
 
   def parseAndValidate(zkClient: KafkaZkClient, reassignmentJsonString: 
String): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) 
= {
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0c162434541..01799370ef8 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -152,14 +152,6 @@ object ZkUtils {
     assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) }
   }
 
-  def parseTopicsData(jsonData: String): Seq[String] = {
-    for {
-      js <- Json.parseFull(jsonData).toSeq
-      partitionsSeq <- js.asJsonObject.get("topics").toSeq
-      p <- partitionsSeq.asJsonArray.iterator
-    } yield p.asJsonObject("topic").to[String]
-  }
-
   def controllerZkData(brokerId: Int, timestamp: Long): String = {
     Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, 
"timestamp" -> timestamp.toString))
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Adding checks on "version" field for tools using it
> ---------------------------------------------------
>
>                 Key: KAFKA-5919
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5919
>             Project: Kafka
>          Issue Type: Bug
>          Components: tools
>            Reporter: Paolo Patierno
>            Assignee: Paolo Patierno
>            Priority: Minor
>             Fix For: 2.0.0
>
>
> Hi,
> the kafka-delete-records script allows user to pass information about records 
> to delete through a JSON file. Such file, as described in the command help, 
> is made by a "partitions" array and a "version" field. Reading 
> [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient]
>  and the DeleteRecords API (Key: 21) description it's not clear what such 
> field is and even it's not used at all (in the current implementation).
> It turned out that the field is for having backward compatibility in the 
> future where the JSON format could change. This JIRA is about adding more 
> checks on the "version" field having it not mandatory but assuming the 
> earliest version (current 1) if it's omitted from the JSON file.
> The same for the kafka-reassign-partitions which has a topics-to-move JSON 
> file as input (used with --generate option) and the partitions-to-move.json 
> (used with --execute option). In both cases the same logic can be applied as 
> above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to