change to standardize on [%s,%d] for partition

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a61e7381
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a61e7381
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a61e7381

Branch: refs/heads/trunk
Commit: a61e7381382ada885debef15379675fea6974ec1
Parents: e37a464
Author: Neha Narkhede <[email protected]>
Authored: Mon Apr 29 11:09:14 2013 -0700
Committer: Neha Narkhede <[email protected]>
Committed: Mon Apr 29 11:09:19 2013 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  2 +-
 .../main/scala/kafka/cluster/Partition.scala    | 21 +++++++++++---------
 core/src/main/scala/kafka/cluster/Replica.scala | 10 +++++-----
 .../kafka/consumer/ConsumerFetcherThread.scala  |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  2 +-
 .../kafka/producer/BrokerPartitionInfo.scala    |  4 ++--
 .../kafka/server/AbstractFetcherThread.scala    |  6 +++---
 .../kafka/server/HighwaterMarkCheckpoint.scala  |  5 ++---
 .../src/main/scala/kafka/server/KafkaApis.scala |  9 +++++----
 .../kafka/server/ReplicaFetcherThread.scala     |  2 +-
 .../kafka/tools/VerifyConsumerRebalance.scala   | 10 +++++-----
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  8 ++++----
 .../ZookeeperConsumerConnectorTest.scala        |  4 ++--
 .../test/scala/unit/kafka/utils/TestUtils.scala | 12 +++++------
 14 files changed, 50 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 63f5bc8..c7652ad 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -117,7 +117,7 @@ object AdminUtils extends Logging {
               try {
                 Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, 
List(l)).head)
               } catch {
-                case e => throw new LeaderNotAvailableException("Leader not 
available for topic %s partition %d".format(topic, partition), e)
+                case e => throw new LeaderNotAvailableException("Leader not 
available for partition [%s,%d]".format(topic, partition), e)
               }
             case None => throw new LeaderNotAvailableException("No leader 
exists for partition " + partition)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 9a29fb2..02d2c44 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -221,7 +221,7 @@ class Partition(val topic: String,
           if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= 
leaderHW) {
             // expand ISR
             val newInSyncReplicas = inSyncReplicas + replica
-            info("Expanding ISR for topic %s partition %d from %s to %s"
+            info("Expanding ISR for partition [%s,%d] from %s to %s"
                  .format(topic, partitionId, 
inSyncReplicas.map(_.brokerId).mkString(","), 
newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in ZK and cache
             updateIsr(newInSyncReplicas)
@@ -270,10 +270,10 @@ class Partition(val topic: String,
     val oldHighWatermark = leaderReplica.highWatermark
     if(newHighWatermark > oldHighWatermark) {
       leaderReplica.highWatermark = newHighWatermark
-      debug("Highwatermark for topic %s partition %d updated to 
%d".format(topic, partitionId, newHighWatermark))
+      debug("Highwatermark for partition [%s,%d] updated to %d".format(topic, 
partitionId, newHighWatermark))
     }
     else
-      debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's 
are %s"
+      debug("Old hw for partition [%s,%d] is %d. New hw is %d. All leo's are 
%s"
         .format(topic, partitionId, oldHighWatermark, newHighWatermark, 
allLogEndOffsets.mkString(",")))
   }
 
@@ -285,7 +285,7 @@ class Partition(val topic: String,
           if(outOfSyncReplicas.size > 0) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
             assert(newInSyncReplicas.size > 0)
-            info("Shrinking ISR for topic %s partition %d from %s to 
%s".format(topic, partitionId,
+            info("Shrinking ISR for partition [%s,%d] from %s to 
%s".format(topic, partitionId,
               inSyncReplicas.map(_.brokerId).mkString(","), 
newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
             updateIsr(newInSyncReplicas)
@@ -310,13 +310,16 @@ class Partition(val topic: String,
     val candidateReplicas = inSyncReplicas - leaderReplica
     // Case 1 above
     val possiblyStuckReplicas = candidateReplicas.filter(r => r.logEndOffset < 
leaderLogEndOffset)
-    debug("Possibly stuck replicas for topic %s partition %d are 
%s".format(topic, partitionId,
-      possiblyStuckReplicas.map(_.brokerId).mkString(",")))
+    if(possiblyStuckReplicas.size > 0)
+      debug("Possibly stuck replicas for partition [%s,%d] are 
%s".format(topic, partitionId,
+        possiblyStuckReplicas.map(_.brokerId).mkString(",")))
     val stuckReplicas = possiblyStuckReplicas.filter(r => 
r.logEndOffsetUpdateTimeMs < (time.milliseconds - keepInSyncTimeMs))
-    debug("Stuck replicas for topic %s partition %d are %s".format(topic, 
partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
+    if(stuckReplicas.size > 0)
+      debug("Stuck replicas for partition [%s,%d] are %s".format(topic, 
partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     // Case 2 above
     val slowReplicas = candidateReplicas.filter(r => r.logEndOffset >= 0 && 
(leaderLogEndOffset - r.logEndOffset) > keepInSyncMessages)
-    debug("Slow replicas for topic %s partition %d are %s".format(topic, 
partitionId, slowReplicas.map(_.brokerId).mkString(",")))
+    if(slowReplicas.size > 0)
+      debug("Slow replicas for partition [%s,%d] are %s".format(topic, 
partitionId, slowReplicas.map(_.brokerId).mkString(",")))
     stuckReplicas ++ slowReplicas
   }
 
@@ -338,7 +341,7 @@ class Partition(val topic: String,
   }
 
   private def updateIsr(newIsr: Set[Replica]) {
-    debug("Updated ISR for topic %s partition %d to %s".format(topic, 
partitionId, newIsr.mkString(",")))
+    debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, 
newIsr.mkString(",")))
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.map(r => r.brokerId).toList, zkVersion)
     // use the epoch of the controller that made the leadership decision, 
instead of the current controller epoch
     val (updateSucceeded, newVersion) = 
ZkUtils.conditionalUpdatePersistentPath(zkClient,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala 
b/core/src/main/scala/kafka/cluster/Replica.scala
index 321ab58..5e659b4 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -40,10 +40,10 @@ class Replica(val brokerId: Int,
     if (!isLocal) {
       logEndOffsetValue.set(newLogEndOffset)
       logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
-      trace("Setting log end offset for replica %d for topic %s partition %d 
to %d"
+      trace("Setting log end offset for replica %d for partition [%s,%d] to %d"
             .format(brokerId, topic, partitionId, logEndOffsetValue.get()))
     } else
-      throw new KafkaException("Shouldn't set logEndOffset for replica %d 
topic %s partition %d since it's local"
+      throw new KafkaException("Shouldn't set logEndOffset for replica %d 
partition [%s,%d] since it's local"
           .format(brokerId, topic, partitionId))
 
   }
@@ -66,11 +66,11 @@ class Replica(val brokerId: Int,
 
   def highWatermark_=(newHighWatermark: Long) {
     if (isLocal) {
-      trace("Setting hw for replica %d topic %s partition %d on broker %d to 
%d"
+      trace("Setting hw for replica %d partition [%s,%d] on broker %d to %d"
               .format(brokerId, topic, partitionId, brokerId, 
newHighWatermark))
       highWatermarkValue.set(newHighWatermark)
     } else
-      throw new KafkaException("Unable to set highwatermark for replica %d 
topic %s partition %d since it's not local"
+      throw new KafkaException("Unable to set highwatermark for replica %d 
partition [%s,%d] since it's not local"
               .format(brokerId, topic, partitionId))
   }
 
@@ -78,7 +78,7 @@ class Replica(val brokerId: Int,
     if (isLocal)
       highWatermarkValue.get()
     else
-      throw new KafkaException("Unable to get highwatermark for replica %d 
topic %s partition %d since it's not local"
+      throw new KafkaException("Unable to get highwatermark for replica %d 
partition [%s,%d] since it's not local"
               .format(brokerId, topic, partitionId))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 80df1b5..5f9c902 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -45,7 +45,7 @@ class ConsumerFetcherThread(name: String,
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: 
Long, partitionData: FetchResponsePartitionData) {
     val pti = partitionMap(topicAndPartition)
     if (pti.getFetchOffset != fetchOffset)
-      throw new RuntimeException("Offset doesn't match for topic %s partition: 
%d pti offset: %d fetch offset: %d"
+      throw new RuntimeException("Offset doesn't match for partition [%s,%d] 
pti offset: %d fetch offset: %d"
                                 .format(topicAndPartition.topic, 
topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
     pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 497cfdd..4771d11 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -197,7 +197,7 @@ private[kafka] class LogManager(val config: KafkaConfig,
                     config.logIndexIntervalBytes, 
                     time, 
                     config.brokerId)
-      info("Created log for topic %s partition %d in 
%s.".format(topicAndPartition.topic, topicAndPartition.partition, 
dataDir.getAbsolutePath))
+      info("Created log for partition [%s,%d] in 
%s.".format(topicAndPartition.topic, topicAndPartition.partition, 
dataDir.getAbsolutePath))
       logs.put(topicAndPartition, log)
       log
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala 
b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 72597ef..82e6e4d 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -57,10 +57,10 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
     partitionMetadata.map { m =>
       m.leader match {
         case Some(leader) =>
-          debug("Topic %s partition %d has leader %d".format(topic, 
m.partitionId, leader.id))
+          debug("Partition [%s,%d] has leader %d".format(topic, m.partitionId, 
leader.id))
           new PartitionAndLeader(topic, m.partitionId, Some(leader.id))
         case None =>
-          debug("Topic %s partition %d does not have a leader 
yet".format(topic, m.partitionId))
+          debug("Partition [%s,%d] does not have a leader yet".format(topic, 
m.partitionId))
           new PartitionAndLeader(topic, m.partitionId, None)
       }
     }.sortWith((s, t) => s.partitionId < t.partitionId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 2ac7a17..162c749 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -144,15 +144,15 @@ abstract class AbstractFetcherThread(name: String, 
clientId: String, sourceBroke
                   try {
                     val newOffset = handleOffsetOutOfRange(topicAndPartition)
                     partitionMap.put(topicAndPartition, newOffset)
-                    warn("current offset %d for topic %s partition %d out of 
range; reset offset to %d"
+                    warn("current offset %d for partition [%s,%d] out of 
range; reset offset to %d"
                       .format(currentOffset.get, topic, partitionId, 
newOffset))
                   } catch {
                     case e =>
-                      warn("error getting offset for %s %d to broker 
%d".format(topic, partitionId, sourceBroker.id), e)
+                      warn("error getting offset for partition [%s,%d] to 
broker %d".format(topic, partitionId, sourceBroker.id), e)
                       partitionsWithError += topicAndPartition
                   }
                 case _ =>
-                  warn("error for %s %d to broker %d".format(topic, 
partitionId, sourceBroker.id),
+                  warn("error for partition [%s,%d] to broker 
%d".format(topic, partitionId, sourceBroker.id),
                     ErrorMapping.exceptionFor(partitionData.error))
                   partitionsWithError += topicAndPartition
               }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala 
b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
index 5aa0141..30caec1 100644
--- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
@@ -77,8 +77,7 @@ class HighwaterMarkCheckpoint(val path: String) extends 
Logging {
     try {
       hwFile.length() match {
         case 0 => 
-          warn("No highwatermark file is found. Returning 0 as the 
highwatermark for topic %s partition %d."
-            .format(topic, partition))
+          warn("No highwatermark file is found. Returning 0 as the 
highwatermark for partition [%s,%d]".format(topic, partition))
           0L
         case _ =>
           val hwFileReader = new BufferedReader(new FileReader(hwFile))
@@ -99,7 +98,7 @@ class HighwaterMarkCheckpoint(val path: String) extends 
Logging {
               val hwOpt = 
partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))
               hwOpt match {
                 case Some(hw) => 
-                  debug("Read hw %d for topic %s partition %d from 
highwatermark checkpoint file".format(hw, topic, partition))
+                  debug("Read hw %d for partition [%s,%d] from highwatermark 
checkpoint file".format(hw, topic, partition))
                   hw
                 case None => 
                   warn("No previously checkpointed highwatermark value found 
for topic %s ".format(topic) +

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6b6f8f2..bb178d6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -252,7 +252,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val response = new FetchResponse(fetchRequest.correlationId, dataRead)
       requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(response)))
     } else {
-      debug("Putting fetch request into purgatory")
+      debug("Putting fetch request with correlation id %d from client %s into 
purgatory".format(fetchRequest.correlationId,
+        fetchRequest.clientId))
       // create a list of (topic, partition) pairs to use as keys for this 
delayed request
       val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new 
RequestKey(_))
       val delayedFetch = new DelayedFetch(delayedFetchKeys, request, 
fetchRequest, fetchRequest.maxWait, bytesReadable)
@@ -285,7 +286,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             if (!isFetchFromFollower) {
               new FetchResponsePartitionData(ErrorMapping.NoError, 
highWatermark, messages)
             } else {
-              debug("Leader %d for topic %s partition %d received fetch 
request from follower %d"
+              debug("Leader %d for partition [%s,%d] received fetch request 
from follower %d"
                             .format(brokerId, topic, partition, 
fetchRequest.replicaId))
               new FetchResponsePartitionData(ErrorMapping.NoError, 
highWatermark, messages)
             }
@@ -302,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             case t =>
               
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
               
BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
-              error("Error when processing fetch request for topic %s 
partition %d offset %d from %s with correlation id %d"
+              error("Error when processing fetch request for partition [%s,%d] 
offset %d from %s with correlation id %d"
                     .format(topic, partition, offset, if (isFetchFromFollower) 
"follower" else "consumer", fetchRequest.correlationId),
                     t)
               new 
FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
 -1L, MessageSet.Empty)
@@ -334,7 +335,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case Some(log) =>
         log.read(offset, maxSize, maxOffsetOpt)
       case None =>
-        error("Leader for topic %s partition %d on broker %d does not have a 
local log".format(topic, partition, brokerId))
+        error("Leader for partition [%s,%d] on broker %d does not have a local 
log".format(topic, partition, brokerId))
         MessageSet.Empty
     }
     (messages, localReplica.highWatermark)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index b733fa3..74073d0 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -54,7 +54,7 @@ class ReplicaFetcherThread(name:String,
             .format(replica.brokerId, replica.logEndOffset, 
messageSet.sizeInBytes))
       val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
       replica.highWatermark = followerHighWatermark
-      trace("Follower %d set replica highwatermark for topic %s partition %d 
to %d"
+      trace("Follower %d set replica highwatermark for partition [%s,%d] to %d"
             .format(replica.brokerId, topic, partitionId, 
followerHighWatermark))
     } catch {
       case e: KafkaStorageException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala 
b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
index dc6d066..eac9af2 100644
--- a/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
+++ b/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
@@ -99,7 +99,7 @@ object VerifyConsumerRebalance extends Logging {
       partitions.foreach { partition =>
       // check if there is a node for [partition]
         if(!partitionsWithOwners.exists(p => p.equals(partition))) {
-          error("No owner for topic %s partition %s".format(topic, partition))
+          error("No owner for partition [%s,%d]".format(topic, partition))
           rebalanceSucceeded = false
         }
         // try reading the partition owner path for see if a valid consumer id 
exists there
@@ -109,7 +109,7 @@ object VerifyConsumerRebalance extends Logging {
           case None => null
         }
         if(partitionOwner == null) {
-          error("No owner for topic %s partition %s".format(topic, partition))
+          error("No owner for partition [%s,%d]".format(topic, partition))
           rebalanceSucceeded = false
         }
         else {
@@ -117,12 +117,12 @@ object VerifyConsumerRebalance extends Logging {
           consumerIdsForTopic match {
             case Some(consumerIds) =>
               if(!consumerIds.contains(partitionOwner)) {
-                error("Owner %s for topic %s partition %s is not a valid 
member of consumer " +
-                  "group %s".format(partitionOwner, topic, partition, group))
+                error(("Owner %s for partition [%s,%d] is not a valid member 
of consumer " +
+                  "group %s").format(partitionOwner, topic, partition, group))
                 rebalanceSucceeded = false
               }
               else
-                info("Owner of topic %s partition %s is %s".format(topic, 
partition, partitionOwner))
+                info("Owner of partition [%s,%d] is %s".format(topic, 
partition, partitionOwner))
             case None => {
               error("No consumer ids registered for topic " + topic)
               rebalanceSucceeded = false

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 7971a09..4f6fcd4 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -101,7 +101,7 @@ object ZkUtils extends Logging {
         val isr = leaderIsrAndEpochInfo.get("isr").get.asInstanceOf[List[Int]]
         val controllerEpoch = 
leaderIsrAndEpochInfo.get("controller_epoch").get.asInstanceOf[Int]
         val zkPathVersion = stat.getVersion
-        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s 
and partition %d".format(leader, epoch,
+        debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for partition 
[%s,%d]".format(leader, epoch,
           isr.toString(), zkPathVersion, topic, partition))
         Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, 
zkPathVersion), controllerEpoch))
       case None => None
@@ -131,10 +131,10 @@ object ZkUtils extends Logging {
     leaderAndIsrOpt match {
       case Some(leaderAndIsr) =>
         Json.parseFull(leaderAndIsr) match {
-          case None => throw new NoEpochForPartitionException("No epoch, 
leaderAndISR data for topic %s partition %d is invalid".format(topic, 
partition))
+          case None => throw new NoEpochForPartitionException("No epoch, 
leaderAndISR data for partition [%s,%d] is invalid".format(topic, partition))
           case Some(m) => m.asInstanceOf[Map[String, 
Any]].get("leader_epoch").get.asInstanceOf[Int]
         }
-      case None => throw new NoEpochForPartitionException("No epoch, ISR path 
for topic %s partition %d is empty"
+      case None => throw new NoEpochForPartitionException("No epoch, ISR path 
for partition [%s,%d] is empty"
         .format(topic, partition))
     }
   }
@@ -177,7 +177,7 @@ object ZkUtils extends Logging {
 
   def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, 
brokerId: Int): Boolean = {
     val replicas = getReplicasForPartition(zkClient, topic, partition)
-    debug("The list of replicas for topic %s, partition %d is 
%s".format(topic, partition, replicas))
+    debug("The list of replicas for partition [%s,%d] is %s".format(topic, 
partition, replicas))
     replicas.contains(brokerId.toString)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 86d30ad..4d989e4 100644
--- 
a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -338,7 +338,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
     val producer: Producer[Int, String] = new Producer[Int, String](new 
ProducerConfig(props))
     val ms = 0.until(numMessages).map(x => header + config.brokerId + "-" + 
partition + "-" + x)
     producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, 
m)):_*)
-    debug("Sent %d messages to broker %d for topic %s and partition 
%d".format(ms.size, config.brokerId, topic, partition))
+    debug("Sent %d messages to broker %d for partition 
[%s,%d]".format(ms.size, config.brokerId, topic, partition))
     producer.close()
     ms.toList
   }
@@ -359,7 +359,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite 
with KafkaServerTestHar
       val ms = 0.until(messagesPerNode).map(x => header + config.brokerId + 
"-" + partition + "-" + x)
       producer.send(ms.map(m => new KeyedMessage[Int, String](topic, 
partition, m)):_*)
       messages ++= ms
-      debug("Sent %d messages to broker %d for topic %s and partition 
%d".format(ms.size, config.brokerId, topic, partition))
+      debug("Sent %d messages to broker %d for partition 
[%s,%d]".format(ms.size, config.brokerId, topic, partition))
     }
     producer.close()
     messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/a61e7381/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 68c134e..f9c9e64 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -408,7 +408,7 @@ object TestUtils extends Logging {
           ZkUtils.updatePersistentPath(zkClient, 
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
             ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch))
         } catch {
-          case oe => error("Error while electing leader for topic %s partition 
%d".format(topic, partition), oe)
+          case oe => error("Error while electing leader for partition 
[%s,%d]".format(topic, partition), oe)
         }
       }
     }
@@ -419,9 +419,9 @@ object TestUtils extends Logging {
     val leaderExistsOrChanged = leaderLock.newCondition()
 
     if(oldLeaderOpt == None)
-      info("Waiting for leader to be elected for topic %s partition 
%d".format(topic, partition))
+      info("Waiting for leader to be elected for partition 
[%s,%d]".format(topic, partition))
     else
-      info("Waiting for leader for topic %s partition %d to be changed from 
old leader %d".format(topic, partition, oldLeaderOpt.get))
+      info("Waiting for leader for partition [%s,%d] to be changed from old 
leader %d".format(topic, partition, oldLeaderOpt.get))
 
     leaderLock.lock()
     try {
@@ -432,10 +432,10 @@ object TestUtils extends Logging {
       leader match {
         case Some(l) =>
           if(oldLeaderOpt == None)
-            info("Leader %d is elected for topic %s partition %d".format(l, 
topic, partition))
+            info("Leader %d is elected for partition [%s,%d]".format(l, topic, 
partition))
           else
-            info("Leader for topic %s partition %d is changed from %d to 
%d".format(topic, partition, oldLeaderOpt.get, l))
-        case None => error("Timing out after %d ms since leader is not elected 
for topic %s partition %d"
+            info("Leader for partition [%s,%d] is changed from %d to 
%d".format(topic, partition, oldLeaderOpt.get, l))
+        case None => error("Timing out after %d ms since leader is not elected 
for partition [%s,%d]"
                                    .format(timeoutMs, topic, partition))
       }
       leader

Reply via email to