git commit: kafka-900; ClosedByInterruptException when high-level consumer shutdown normally; patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches: refs/heads/0.8 85c715915 - 312ed2e67 kafka-900; ClosedByInterruptException when high-level consumer shutdown normally; patched by Jun Rao; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/312ed2e6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/312ed2e6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/312ed2e6 Branch: refs/heads/0.8 Commit: 312ed2e67a0bca194ee3012c61239d30d8890566 Parents: 85c7159 Author: Jun Rao jun...@gmail.com Authored: Wed May 29 09:56:27 2013 -0700 Committer: Jun Rao jun...@gmail.com Committed: Wed May 29 09:56:27 2013 -0700 -- .../scala/kafka/server/AbstractFetcherThread.scala |8 +--- 1 files changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/312ed2e6/core/src/main/scala/kafka/server/AbstractFetcherThread.scala -- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 162c749..48100f4 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -152,9 +152,11 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke partitionsWithError += topicAndPartition } case _ = - warn(error for partition [%s,%d] to broker %d.format(topic, partitionId, sourceBroker.id), -ErrorMapping.exceptionFor(partitionData.error)) - partitionsWithError += topicAndPartition + if (isRunning.get) { +warn(error for partition [%s,%d] to broker %d.format(topic, partitionId, sourceBroker.id), + ErrorMapping.exceptionFor(partitionData.error)) +partitionsWithError += topicAndPartition + } } } }
git commit: kafka-649; Cleanup log4j logging (extra); patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches: refs/heads/0.8 312ed2e67 - e4f287db6 kafka-649; Cleanup log4j logging (extra); patched by Jun Rao; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e4f287db Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e4f287db Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e4f287db Branch: refs/heads/0.8 Commit: e4f287db6142cafdf1ddb4ebf7110180ec06f7c4 Parents: 312ed2e Author: Jun Rao jun...@gmail.com Authored: Wed May 29 10:00:51 2013 -0700 Committer: Jun Rao jun...@gmail.com Committed: Wed May 29 10:00:51 2013 -0700 -- .../kafka/controller/PartitionLeaderSelector.scala |7 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 22 +- core/src/main/scala/kafka/utils/ZkUtils.scala |8 +++--- 3 files changed, 22 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala -- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 21b0e24..a47b142 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -63,13 +63,14 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten case false = ControllerStats.uncleanLeaderElectionRate.mark() val newLeader = liveAssignedReplicasToThisPartition.head -warn(No broker in ISR is alive for %s. Elect leader from broker %s. There's potential data loss. - .format(topicAndPartition, newLeader)) +warn(No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss. + .format(topicAndPartition, newLeader, liveAssignedReplicasToThisPartition.mkString(,))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false = val newLeader = liveBrokersInIsr.head -debug(Some broker in ISR is alive for %s. Select %d from ISR to be the leader..format(topicAndPartition, newLeader)) +debug(Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader. + .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(,))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) } info(Selected new leader and ISR %s for offline partition %s.format(newLeaderAndIsr.toString(), topicAndPartition)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e4f287db/core/src/main/scala/kafka/server/KafkaApis.scala -- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 93e2f04..dd88ccd 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -239,16 +239,18 @@ class KafkaApis(val requestChannel: RequestChannel, Runtime.getRuntime.halt(1) null case utpe: UnknownTopicOrPartitionException = - warn(Produce request: + utpe.getMessage) + warn(Produce request with correlation id %d from client %s on partition %s failed due to %s.format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, utpe.getMessage)) new ProduceResult(topicAndPartition, utpe) case nle: NotLeaderForPartitionException = - warn(Produce request: + nle.getMessage) + warn(Produce request with correlation id %d from client %s on partition %s failed due to %s.format( + producerRequest.correlationId, producerRequest.clientId, topicAndPartition, nle.getMessage)) new ProduceResult(topicAndPartition, nle) case e = BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).failedProduceRequestRate.mark() BrokerTopicStats.getBrokerAllTopicsStats.failedProduceRequestRate.mark() - error(Error processing ProducerRequest with correlation id %d from client %s on %s:%d -.format(producerRequest.correlationId, producerRequest.clientId, topicAndPartition.topic, topicAndPartition.partition), e) + error(Error processing ProducerRequest with correlation id %d from client %s on partition %s +.format(producerRequest.correlationId,
git commit: kafka-259; Give better error message when trying to run shell scripts without having built/downloaded the jars yet; patched by Ashwanth Fernando; reviewed by Jun Rao
Updated Branches: refs/heads/0.8 bbb161aa2 - 436dd25a4 kafka-259; Give better error message when trying to run shell scripts without having built/downloaded the jars yet; patched by Ashwanth Fernando; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/436dd25a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/436dd25a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/436dd25a Branch: refs/heads/0.8 Commit: 436dd25a4885fa13749604111000f52724eb4ccd Parents: bbb161a Author: Jun Rao jun...@gmail.com Authored: Wed May 29 20:26:08 2013 -0700 Committer: Jun Rao jun...@gmail.com Committed: Wed May 29 20:26:08 2013 -0700 -- bin/kafka-run-class.sh | 16 1 files changed, 16 insertions(+), 0 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/436dd25a/bin/kafka-run-class.sh -- diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 978447d..1b66655 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -65,3 +65,19 @@ else fi $JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@ + +exitval=$? + +if [ $exitval -eq 1 ] ; then + $JAVA $KAFKA_OPTS $KAFKA_JMX_OPTS -cp $CLASSPATH $@ exception.txt + exception=`cat exception.txt` + noBuildMessage='Please build the project using sbt. Documentation is available at http://kafka.apache.org/' + pattern=(Could not find or load main class)|(java\.lang\.NoClassDefFoundError) + match=`echo $exception | grep -E $pattern` + if [[ -n $match ]]; then + echo $noBuildMessage + fi + rm exception.txt +fi + +
git commit: kafka-891; NullPointerException in ConsoleConsumer; patched by Colin B.; reviewed by Neha Narkhede and Jun Rao
Updated Branches: refs/heads/trunk 731ba9007 - 3b470f56b kafka-891; NullPointerException in ConsoleConsumer; patched by Colin B.; reviewed by Neha Narkhede and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b470f56 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b470f56 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b470f56 Branch: refs/heads/trunk Commit: 3b470f56b479f618b7e90577f9857b4e25b38c1a Parents: 731ba90 Author: Colin B lanzaa+...@gmail.com Authored: Wed May 29 21:25:34 2013 -0700 Committer: Jun Rao jun...@gmail.com Committed: Wed May 29 21:25:34 2013 -0700 -- .../scala/kafka/consumer/ConsoleConsumer.scala |4 ++-- 1 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/3b470f56/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala -- diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index d6c4a51..9b566b9 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -282,10 +282,10 @@ class DefaultMessageFormatter extends MessageFormatter { def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { if(printKey) { - output.write(key) + output.write(if (key == null) null.getBytes() else key) output.write(keySeparator) } -output.write(value) +output.write(if (value == null) null.getBytes() else value) output.write(lineSeparator) } }
git commit: trivial change to fix incorrect comment in Message
Updated Branches: refs/heads/0.8 436dd25a4 - 492ed7a35 trivial change to fix incorrect comment in Message Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/492ed7a3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/492ed7a3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/492ed7a3 Branch: refs/heads/0.8 Commit: 492ed7a35d031b737b7c98dadcc8be777b9d2d57 Parents: 436dd25 Author: Jun Rao jun...@gmail.com Authored: Wed May 29 21:34:11 2013 -0700 Committer: Jun Rao jun...@gmail.com Committed: Wed May 29 21:34:11 2013 -0700 -- core/src/main/scala/kafka/message/Message.scala |5 +++-- 1 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/492ed7a3/core/src/main/scala/kafka/message/Message.scala -- diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 12a8368..9b6eece 100644 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -74,8 +74,9 @@ object Message { * 3. 1 byte attributes identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) * 4. 4 byte key length, containing length K * 5. K byte key - * 6. (N - K - 10) byte payload - * + * 6. 4 byte payload length, containing length V + * 7. V byte payload + * * Default constructor wraps an existing ByteBuffer with the Message object with no change to the contents. */ class Message(val buffer: ByteBuffer) {
git commit: kafka-856; Correlation id for OffsetFetch request (#2) always responds with 0; patched by Milosz Tanski; reviewed by Jun Rao
Updated Branches: refs/heads/0.8 492ed7a35 - dc0de2925 kafka-856; Correlation id for OffsetFetch request (#2) always responds with 0; patched by Milosz Tanski; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc0de292 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc0de292 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc0de292 Branch: refs/heads/0.8 Commit: dc0de292531f0af0a0eba71d170c7bd2706500d8 Parents: 492ed7a Author: Milosz Tanski mil...@adfin.com Authored: Wed May 29 21:43:41 2013 -0700 Committer: Jun Rao jun...@gmail.com Committed: Wed May 29 21:43:41 2013 -0700 -- core/src/main/scala/kafka/api/OffsetRequest.scala |2 +- 1 files changed, 1 insertions(+), 1 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/dc0de292/core/src/main/scala/kafka/api/OffsetRequest.scala -- diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala index 32ebfd4..0a94a6c 100644 --- a/core/src/main/scala/kafka/api/OffsetRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetRequest.scala @@ -49,7 +49,7 @@ object OffsetRequest { (TopicAndPartition(topic, partitionId), PartitionOffsetRequestInfo(time, maxNumOffsets)) }) }) -OffsetRequest(Map(pairs:_*), versionId = versionId, clientId = clientId, replicaId = replicaId) +OffsetRequest(Map(pairs:_*), versionId= versionId, clientId = clientId, correlationId = correlationId, replicaId = replicaId) } }