git commit: kafka-900; ClosedByInterruptException when high-level consumer shutdown normally; patched by Jun Rao; reviewed by Neha Narkhede

2013-05-29 Thread junrao
Updated Branches:
  refs/heads/0.8 85c715915 - 312ed2e67


kafka-900; ClosedByInterruptException when high-level consumer shutdown 
normally; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/312ed2e6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/312ed2e6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/312ed2e6

Branch: refs/heads/0.8
Commit: 312ed2e67a0bca194ee3012c61239d30d8890566
Parents: 85c7159
Author: Jun Rao jun...@gmail.com
Authored: Wed May 29 09:56:27 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed May 29 09:56:27 2013 -0700

--
 .../scala/kafka/server/AbstractFetcherThread.scala |8 +---
 1 files changed, 5 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/312ed2e6/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
--
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 162c749..48100f4 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -152,9 +152,11 @@ abstract class AbstractFetcherThread(name: String, 
clientId: String, sourceBroke
   partitionsWithError += topicAndPartition
   }
 case _ =
-  warn(error for partition [%s,%d] to broker 
%d.format(topic, partitionId, sourceBroker.id),
-ErrorMapping.exceptionFor(partitionData.error))
-  partitionsWithError += topicAndPartition
+  if (isRunning.get) {
+warn(error for partition [%s,%d] to broker 
%d.format(topic, partitionId, sourceBroker.id),
+  ErrorMapping.exceptionFor(partitionData.error))
+partitionsWithError += topicAndPartition
+  }
   }
 }
 }



git commit: kafka-649; Cleanup log4j logging (extra); patched by Jun Rao; reviewed by Neha Narkhede

2013-05-29 Thread junrao
Updated Branches:
  refs/heads/0.8 312ed2e67 - e4f287db6


kafka-649; Cleanup log4j logging (extra); patched by Jun Rao; reviewed by Neha 
Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4f287db
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4f287db
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4f287db

Branch: refs/heads/0.8
Commit: e4f287db6142cafdf1ddb4ebf7110180ec06f7c4
Parents: 312ed2e
Author: Jun Rao jun...@gmail.com
Authored: Wed May 29 10:00:51 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed May 29 10:00:51 2013 -0700

--
 .../kafka/controller/PartitionLeaderSelector.scala |7 +++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |   22 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala  |8 +++---
 3 files changed, 22 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
--
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 21b0e24..a47b142 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -63,13 +63,14 @@ class OfflinePartitionLeaderSelector(controllerContext: 
ControllerContext) exten
   case false =
 ControllerStats.uncleanLeaderElectionRate.mark()
 val newLeader = liveAssignedReplicasToThisPartition.head
-warn(No broker in ISR is alive for %s. Elect leader from 
broker %s. There's potential data loss.
- .format(topicAndPartition, newLeader))
+warn(No broker in ISR is alive for %s. Elect leader %d from 
live brokers %s. There's potential data loss.
+ .format(topicAndPartition, newLeader, 
liveAssignedReplicasToThisPartition.mkString(,)))
 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, 
List(newLeader), currentLeaderIsrZkPathVersion + 1)
 }
   case false =
 val newLeader = liveBrokersInIsr.head
-debug(Some broker in ISR is alive for %s. Select %d from ISR to 
be the leader..format(topicAndPartition, newLeader))
+debug(Some broker in ISR is alive for %s. Select %d from ISR %s 
to be the leader.
+  .format(topicAndPartition, newLeader, 
liveBrokersInIsr.mkString(,)))
 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, 
liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
 }
 info(Selected new leader and ISR %s for offline partition 
%s.format(newLeaderAndIsr.toString(), topicAndPartition))

http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/server/KafkaApis.scala
--
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 93e2f04..dd88ccd 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -239,16 +239,18 @@ class KafkaApis(val requestChannel: RequestChannel,
   Runtime.getRuntime.halt(1)
   null
 case utpe: UnknownTopicOrPartitionException =
-  warn(Produce request:  + utpe.getMessage)
+  warn(Produce request with correlation id %d from client %s on 
partition %s failed due to %s.format(
+   producerRequest.correlationId, producerRequest.clientId, 
topicAndPartition, utpe.getMessage))
   new ProduceResult(topicAndPartition, utpe)
 case nle: NotLeaderForPartitionException =
-  warn(Produce request:  + nle.getMessage)
+  warn(Produce request with correlation id %d from client %s on 
partition %s failed due to %s.format(
+   producerRequest.correlationId, producerRequest.clientId, 
topicAndPartition, nle.getMessage))
   new ProduceResult(topicAndPartition, nle)
 case e =
   
BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark()
   
BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark()
-  error(Error processing ProducerRequest with correlation id %d from 
client %s on %s:%d
-.format(producerRequest.correlationId, producerRequest.clientId, 
topicAndPartition.topic, topicAndPartition.partition), e)
+  error(Error processing ProducerRequest with correlation id %d from 
client %s on partition %s
+.format(producerRequest.correlationId, 

git commit: kafka-259; Give better error message when trying to run shell scripts without having built/downloaded the jars yet; patched by Ashwanth Fernando; reviewed by Jun Rao

2013-05-29 Thread junrao
Updated Branches:
  refs/heads/0.8 bbb161aa2 - 436dd25a4


kafka-259; Give better error message when trying to run shell scripts without 
having built/downloaded the jars yet; patched by Ashwanth Fernando; reviewed by 
Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/436dd25a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/436dd25a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/436dd25a

Branch: refs/heads/0.8
Commit: 436dd25a4885fa13749604111000f52724eb4ccd
Parents: bbb161a
Author: Jun Rao jun...@gmail.com
Authored: Wed May 29 20:26:08 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed May 29 20:26:08 2013 -0700

--
 bin/kafka-run-class.sh |   16 
 1 files changed, 16 insertions(+), 0 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/436dd25a/bin/kafka-run-class.sh
--
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 978447d..1b66655 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -65,3 +65,19 @@ else
 fi
 
 $JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@
+
+exitval=$?
+
+if [ $exitval -eq 1 ] ; then 
+   $JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@  exception.txt
+   exception=`cat exception.txt`
+   noBuildMessage='Please build the project using sbt. Documentation is 
available at http://kafka.apache.org/'
+   pattern=(Could not find or load main 
class)|(java\.lang\.NoClassDefFoundError)
+   match=`echo $exception | grep -E $pattern`
+   if [[ -n $match ]]; then 
+   echo $noBuildMessage
+   fi
+   rm exception.txt
+fi
+
+



git commit: kafka-891; NullPointerException in ConsoleConsumer; patched by Colin B.; reviewed by Neha Narkhede and Jun Rao

2013-05-29 Thread junrao
Updated Branches:
  refs/heads/trunk 731ba9007 - 3b470f56b


kafka-891; NullPointerException in ConsoleConsumer; patched by Colin B.; 
reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b470f56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b470f56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b470f56

Branch: refs/heads/trunk
Commit: 3b470f56b479f618b7e90577f9857b4e25b38c1a
Parents: 731ba90
Author: Colin B lanzaa+...@gmail.com
Authored: Wed May 29 21:25:34 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed May 29 21:25:34 2013 -0700

--
 .../scala/kafka/consumer/ConsoleConsumer.scala |4 ++--
 1 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/3b470f56/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
--
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index d6c4a51..9b566b9 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -282,10 +282,10 @@ class DefaultMessageFormatter extends MessageFormatter {
   
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
 if(printKey) {
-  output.write(key)
+  output.write(if (key == null) null.getBytes() else key)
   output.write(keySeparator)
 }
-output.write(value)
+output.write(if (value == null) null.getBytes() else value)
 output.write(lineSeparator)
   }
 }



git commit: trivial change to fix incorrect comment in Message

2013-05-29 Thread junrao
Updated Branches:
  refs/heads/0.8 436dd25a4 - 492ed7a35


trivial change to fix incorrect comment in Message


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/492ed7a3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/492ed7a3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/492ed7a3

Branch: refs/heads/0.8
Commit: 492ed7a35d031b737b7c98dadcc8be777b9d2d57
Parents: 436dd25
Author: Jun Rao jun...@gmail.com
Authored: Wed May 29 21:34:11 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed May 29 21:34:11 2013 -0700

--
 core/src/main/scala/kafka/message/Message.scala |5 +++--
 1 files changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/492ed7a3/core/src/main/scala/kafka/message/Message.scala
--
diff --git a/core/src/main/scala/kafka/message/Message.scala 
b/core/src/main/scala/kafka/message/Message.scala
index 12a8368..9b6eece 100644
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -74,8 +74,9 @@ object Message {
  * 3. 1 byte attributes identifier to allow annotations on the message 
independent of the version (e.g. compression enabled, type of codec used)
  * 4. 4 byte key length, containing length K
  * 5. K byte key
- * 6. (N - K - 10) byte payload
- * 
+ * 6. 4 byte payload length, containing length V
+ * 7. V byte payload
+ *
  * Default constructor wraps an existing ByteBuffer with the Message object 
with no change to the contents.
  */
 class Message(val buffer: ByteBuffer) {



git commit: kafka-856; Correlation id for OffsetFetch request (#2) always responds with 0; patched by Milosz Tanski; reviewed by Jun Rao

2013-05-29 Thread junrao
Updated Branches:
  refs/heads/0.8 492ed7a35 - dc0de2925


kafka-856; Correlation id for OffsetFetch request (#2) always responds with 0; 
patched by Milosz Tanski; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc0de292
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc0de292
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc0de292

Branch: refs/heads/0.8
Commit: dc0de292531f0af0a0eba71d170c7bd2706500d8
Parents: 492ed7a
Author: Milosz Tanski mil...@adfin.com
Authored: Wed May 29 21:43:41 2013 -0700
Committer: Jun Rao jun...@gmail.com
Committed: Wed May 29 21:43:41 2013 -0700

--
 core/src/main/scala/kafka/api/OffsetRequest.scala |2 +-
 1 files changed, 1 insertions(+), 1 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/dc0de292/core/src/main/scala/kafka/api/OffsetRequest.scala
--
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala 
b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 32ebfd4..0a94a6c 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -49,7 +49,7 @@ object OffsetRequest {
 (TopicAndPartition(topic, partitionId), 
PartitionOffsetRequestInfo(time, maxNumOffsets))
   })
 })
-OffsetRequest(Map(pairs:_*), versionId = versionId, clientId = clientId, 
replicaId = replicaId)
+OffsetRequest(Map(pairs:_*), versionId= versionId, clientId = clientId, 
correlationId = correlationId, replicaId = replicaId)
   }
 }