[ https://issues.apache.org/jira/browse/KAFKA-5919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16500879#comment-16500879 ]
ASF GitHub Bot commented on KAFKA-5919: --------------------------------------- lindong28 closed pull request #5126: KAFKA-5919: Adding checks on "version" field for tools using it URL: https://github.com/apache/kafka/pull/5126 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala index 2715490ec23..9d426b1a0ae 100644 --- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala @@ -22,31 +22,51 @@ import java.util.Properties import kafka.admin.AdminClient.DeleteRecordsResult import kafka.common.AdminCommandFailedException -import kafka.utils.{CoreUtils, Json, CommandLineUtils} +import kafka.utils.{CommandLineUtils, CoreUtils, Json} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Utils import org.apache.kafka.clients.CommonClientConfigs import joptsimple._ +import kafka.utils.json.JsonValue /** * A command for delete records of the given partitions down to the specified offset. */ object DeleteRecordsCommand { + private[admin] val EarliestVersion = 1 + def main(args: Array[String]): Unit = { execute(args, System.out) } def parseOffsetJsonStringWithoutDedup(jsonData: String): Seq[(TopicPartition, Long)] = { - Json.parseFull(jsonData).toSeq.flatMap { js => - js.asJsonObject.get("partitions").toSeq.flatMap { partitionsJs => - partitionsJs.asJsonArray.iterator.map(_.asJsonObject).map { partitionJs => - val topic = partitionJs("topic").to[String] - val partition = partitionJs("partition").to[Int] - val offset = partitionJs("offset").to[Long] - new TopicPartition(topic, partition) -> offset - }.toBuffer - } + Json.parseFull(jsonData) match { + case Some(js) => + val version = js.asJsonObject.get("version") match { + case Some(jsonValue) => jsonValue.to[Int] + case None => EarliestVersion + } + parseJsonData(version, js) + case None => throw new AdminOperationException("The input string is not a valid JSON") + } + } + + def parseJsonData(version: Int, js: JsonValue): Seq[(TopicPartition, Long)] = { + version match { + case 1 => + js.asJsonObject.get("partitions") match { + case Some(partitions) => { + partitions.asJsonArray.iterator.map(_.asJsonObject).map { partitionJs => + val topic = partitionJs("topic").to[String] + val partition = partitionJs("partition").to[Int] + val offset = partitionJs("offset").to[Long] + new TopicPartition(topic, partition) -> offset + }.toBuffer + } + case _ => throw new AdminOperationException("Missing partitions field"); + } + case _ => throw new AdminOperationException(s"Not supported version field value $version") } } diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index f765b94a889..4d9da90bc69 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -25,6 +25,7 @@ import kafka.log.LogConfig import kafka.log.LogConfig._ import kafka.server.{ConfigType, DynamicConfig} import kafka.utils._ +import kafka.utils.json.JsonValue import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo import org.apache.kafka.clients.admin.{AdminClientConfig, AlterReplicaLogDirsOptions, AdminClient => JAdminClient} @@ -45,6 +46,8 @@ object ReassignPartitionsCommand extends Logging { private[admin] val NoThrottle = Throttle(-1, -1) private[admin] val AnyLogDir = "any" + private[admin] val EarliestVersion = 1 + def main(args: Array[String]): Unit = { val opts = validateAndParseArgs(args) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) @@ -168,7 +171,7 @@ object ReassignPartitionsCommand extends Logging { } def generateAssignment(zkClient: KafkaZkClient, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]]) = { - val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) + val topicsToReassign = parseTopicsData(topicsToMoveJsonString) val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign) if (duplicateTopicsToReassign.nonEmpty) throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(","))) @@ -241,32 +244,70 @@ object ReassignPartitionsCommand extends Logging { ).asJava) } - // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed + def parseTopicsData(jsonData: String): Seq[String] = { + Json.parseFull(jsonData) match { + case Some(js) => + val version = js.asJsonObject.get("version") match { + case Some(jsonValue) => jsonValue.to[Int] + case None => EarliestVersion + } + parseTopicsData(version, js) + case None => throw new AdminOperationException("The input string is not a valid JSON") + } + } + + def parseTopicsData(version: Int, js: JsonValue): Seq[String] = { + version match { + case 1 => + for { + partitionsSeq <- js.asJsonObject.get("topics").toSeq + p <- partitionsSeq.asJsonArray.iterator + } yield p.asJsonObject("topic").to[String] + case _ => throw new AdminOperationException(s"Not supported version field value $version") + } + } + def parsePartitionReassignmentData(jsonData: String): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = { - val partitionAssignment = mutable.ListBuffer.empty[(TopicPartition, Seq[Int])] - val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String] - for { - js <- Json.parseFull(jsonData).toSeq - partitionsSeq <- js.asJsonObject.get("partitions").toSeq - p <- partitionsSeq.asJsonArray.iterator - } { - val partitionFields = p.asJsonObject - val topic = partitionFields("topic").to[String] - val partition = partitionFields("partition").to[Int] - val newReplicas = partitionFields("replicas").to[Seq[Int]] - val newLogDirs = partitionFields.get("log_dirs") match { - case Some(jsonValue) => jsonValue.to[Seq[String]] - case None => newReplicas.map(_ => AnyLogDir) - } - if (newReplicas.size != newLogDirs.size) - throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " + - s"size of log dirs list $newLogDirs for partition ${new TopicPartition(topic, partition)}") - partitionAssignment += (new TopicPartition(topic, partition) -> newReplicas) - replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, logDir) => - new TopicPartitionReplica(topic, partition, replica) -> logDir - }.filter(_._2 != AnyLogDir) + Json.parseFull(jsonData) match { + case Some(js) => + val version = js.asJsonObject.get("version") match { + case Some(jsonValue) => jsonValue.to[Int] + case None => EarliestVersion + } + parsePartitionReassignmentData(version, js) + case None => throw new AdminOperationException("The input string is not a valid JSON") + } + } + + // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed + def parsePartitionReassignmentData(version:Int, jsonData: JsonValue): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = { + version match { + case 1 => + val partitionAssignment = mutable.ListBuffer.empty[(TopicPartition, Seq[Int])] + val replicaAssignment = mutable.Map.empty[TopicPartitionReplica, String] + for { + partitionsSeq <- jsonData.asJsonObject.get("partitions").toSeq + p <- partitionsSeq.asJsonArray.iterator + } { + val partitionFields = p.asJsonObject + val topic = partitionFields("topic").to[String] + val partition = partitionFields("partition").to[Int] + val newReplicas = partitionFields("replicas").to[Seq[Int]] + val newLogDirs = partitionFields.get("log_dirs") match { + case Some(jsonValue) => jsonValue.to[Seq[String]] + case None => newReplicas.map(_ => AnyLogDir) + } + if (newReplicas.size != newLogDirs.size) + throw new AdminCommandFailedException(s"Size of replicas list $newReplicas is different from " + + s"size of log dirs list $newLogDirs for partition ${new TopicPartition(topic, partition)}") + partitionAssignment += (new TopicPartition(topic, partition) -> newReplicas) + replicaAssignment ++= newReplicas.zip(newLogDirs).map { case (replica, logDir) => + new TopicPartitionReplica(topic, partition, replica) -> logDir + }.filter(_._2 != AnyLogDir) + } + (partitionAssignment, replicaAssignment) + case _ => throw new AdminOperationException(s"Not supported version field value $version") } - (partitionAssignment, replicaAssignment) } def parseAndValidate(zkClient: KafkaZkClient, reassignmentJsonString: String): (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String]) = { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 0c162434541..01799370ef8 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -152,14 +152,6 @@ object ZkUtils { assignments.map { case (tp, p) => (new TopicAndPartition(tp), p) } } - def parseTopicsData(jsonData: String): Seq[String] = { - for { - js <- Json.parseFull(jsonData).toSeq - partitionsSeq <- js.asJsonObject.get("topics").toSeq - p <- partitionsSeq.asJsonArray.iterator - } yield p.asJsonObject("topic").to[String] - } - def controllerZkData(brokerId: Int, timestamp: Long): String = { Json.legacyEncodeAsString(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)) } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Adding checks on "version" field for tools using it > --------------------------------------------------- > > Key: KAFKA-5919 > URL: https://issues.apache.org/jira/browse/KAFKA-5919 > Project: Kafka > Issue Type: Bug > Components: tools > Reporter: Paolo Patierno > Assignee: Paolo Patierno > Priority: Minor > Fix For: 2.0.0 > > > Hi, > the kafka-delete-records script allows user to pass information about records > to delete through a JSON file. Such file, as described in the command help, > is made by a "partitions" array and a "version" field. Reading > [KIP-107|https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient] > and the DeleteRecords API (Key: 21) description it's not clear what such > field is and even it's not used at all (in the current implementation). > It turned out that the field is for having backward compatibility in the > future where the JSON format could change. This JIRA is about adding more > checks on the "version" field having it not mandatory but assuming the > earliest version (current 1) if it's omitted from the JSON file. > The same for the kafka-reassign-partitions which has a topics-to-move JSON > file as input (used with --generate option) and the partitions-to-move.json > (used with --execute option). In both cases the same logic can be applied as > above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)