This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 79a608b KAFKA-7211; MM should handle TimeoutException in commitSync
79a608b is described below
commit 79a608b286fd6271b841d3cc7997bf69227de912
Author: huxihx <[email protected]>
AuthorDate: Tue Sep 4 13:55:49 2018 -0700
KAFKA-7211; MM should handle TimeoutException in commitSync
With KIP-266 introduced, MirrorMaker should handle TimeoutException thrown
in commitSync(). Besides, MM should only commit offsets for existsing topics.
Author: huxihx <[email protected]>
Reviewers: Dong Lin <[email protected]>
Closes #5492 from huxihx/KAFKA-7211
---
core/src/main/scala/kafka/tools/MirrorMaker.scala | 67 +++++++++++++++-------
.../kafka/tools/MirrorMakerIntegrationTest.scala | 30 ++++++++++
2 files changed, 76 insertions(+), 21 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index d55d96b..b6fd918 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -34,17 +34,18 @@ import
org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord, RecordMetadata}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.apache.kafka.common.errors.{TimeoutException, WakeupException}
import org.apache.kafka.common.record.RecordBatch
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
+import scala.util.{Failure, Success, Try}
import scala.util.control.ControlThrowable
/**
* The mirror maker has the following architecture:
- * - There are N mirror maker thread shares one ZookeeperConsumerConnector and
each owns a Kafka stream.
+ * - There are N mirror maker thread, each of which is equipped with a
separate KafkaConsumer instance.
* - All the mirror maker threads share one producer.
* - Each mirror maker thread periodically flushes the producer and then
commits all offsets.
*
@@ -70,6 +71,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
private var offsetCommitIntervalMs = 0
private var abortOnSendFailure: Boolean = true
@volatile private var exitingOnSendFailure: Boolean = false
+ private var lastSuccessfulCommitTime = -1L
+ private val time = Time.SYSTEM
// If a message send failed after retries are exhausted. The offset of the
messages will also be removed from
// the unacked offset list to avoid offset commit being stuck on that
offset. In this case, the offset of that
@@ -268,24 +271,45 @@ object MirrorMaker extends Logging with KafkaMetricsGroup
{
consumers.map(consumer => new ConsumerWrapper(consumer,
customRebalanceListener, whitelist))
}
- def commitOffsets(consumerWrapper: ConsumerWrapper) {
+ def commitOffsets(consumerWrapper: ConsumerWrapper): Unit = {
if (!exitingOnSendFailure) {
- trace("Committing offsets.")
- try {
- consumerWrapper.commit()
- } catch {
- case e: WakeupException =>
- // we only call wakeup() once to close the consumer,
- // so if we catch it in commit we can safely retry
- // and re-throw to break the loop
+ var retry = 0
+ var retryNeeded = true
+ while (retryNeeded) {
+ trace("Committing offsets.")
+ try {
consumerWrapper.commit()
- throw e
+ lastSuccessfulCommitTime = time.milliseconds
+ retryNeeded = false
+ } catch {
+ case e: WakeupException =>
+ // we only call wakeup() once to close the consumer,
+ // so if we catch it in commit we can safely retry
+ // and re-throw to break the loop
+ commitOffsets(consumerWrapper)
+ throw e
+
+ case _: TimeoutException =>
+ Try(consumerWrapper.consumer.listTopics) match {
+ case Success(visibleTopics) =>
+ consumerWrapper.offsets.retain((tp, _) =>
visibleTopics.containsKey(tp.topic))
+ case Failure(e) =>
+ warn("Failed to list all authorized topics after committing
offsets timed out: ", e)
+ }
- case _: CommitFailedException =>
- warn("Failed to commit offsets because the consumer group has
rebalanced and assigned partitions to " +
- "another instance. If you see this regularly, it could indicate
that you need to either increase " +
- s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or
reduce the number of records " +
- s"handled on each iteration with
${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+ retry += 1
+ warn("Failed to commit offsets because the offset commit request
processing can not be completed in time. " +
+ s"If you see this regularly, it could indicate that you need to
increase the consumer's ${ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG} " +
+ s"Last successful offset commit
timestamp=$lastSuccessfulCommitTime, retry count=$retry")
+ Thread.sleep(100)
+
+ case _: CommitFailedException =>
+ retryNeeded = false
+ warn("Failed to commit offsets because the consumer group has
rebalanced and assigned partitions to " +
+ "another instance. If you see this regularly, it could indicate
that you need to either increase " +
+ s"the consumer's ${ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG} or
reduce the number of records " +
+ s"handled on each iteration with
${ConsumerConfig.MAX_POLL_RECORDS_CONFIG}")
+ }
}
} else {
info("Exiting on send failure, skip committing offsets.")
@@ -423,14 +447,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup
{
}
// Visible for testing
- private[tools] class ConsumerWrapper(consumer: Consumer[Array[Byte],
Array[Byte]],
+ private[tools] class ConsumerWrapper(private[tools] val consumer:
Consumer[Array[Byte], Array[Byte]],
customRebalanceListener:
Option[ConsumerRebalanceListener],
whitelistOpt: Option[String]) {
val regex = whitelistOpt.getOrElse(throw new IllegalArgumentException("New
consumer only supports whitelist."))
var recordIter: java.util.Iterator[ConsumerRecord[Array[Byte],
Array[Byte]]] = null
// We manually maintain the consumed offsets for historical reasons and it
could be simplified
- private val offsets = new HashMap[TopicPartition, Long]()
+ // Visible for testing
+ private[tools] val offsets = new HashMap[TopicPartition, Long]()
def init() {
debug("Initiating consumer")
@@ -474,7 +499,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}
def commit() {
- consumer.commitSync(offsets.map { case (tp, offset) => (tp, new
OffsetAndMetadata(offset, ""))}.asJava)
+ consumer.commitSync(offsets.map { case (tp, offset) => (tp, new
OffsetAndMetadata(offset)) }.asJava)
offsets.clear()
}
}
diff --git
a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
index 0a17819..7212b3b 100644
---
a/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/tools/MirrorMakerIntegrationTest.scala
@@ -24,15 +24,45 @@ import kafka.tools.MirrorMaker.{ConsumerWrapper,
MirrorMakerProducer}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer}
import org.junit.Test
+import org.junit.Assert._
class MirrorMakerIntegrationTest extends KafkaServerTestHarness {
override def generateConfigs: Seq[KafkaConfig] =
TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps(_,
new Properties()))
+ @Test(expected = classOf[TimeoutException])
+ def testCommitOffsetsThrowTimeoutException(): Unit = {
+ val consumerProps = new Properties
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "1")
+ val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt
= Some("any"))
+ mirrorMakerConsumer.offsets.put(new TopicPartition("test", 0), 0L)
+ mirrorMakerConsumer.commit()
+ }
+
+ @Test
+ def testCommitOffsetsRemoveNonExistentTopics(): Unit = {
+ val consumerProps = new Properties
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+ consumerProps.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000")
+ val consumer = new KafkaConsumer(consumerProps, new ByteArrayDeserializer,
new ByteArrayDeserializer)
+ val mirrorMakerConsumer = new ConsumerWrapper(consumer, None, whitelistOpt
= Some("any"))
+ mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic1",
0), 0L)
+ mirrorMakerConsumer.offsets.put(new TopicPartition("nonexistent-topic2",
0), 0L)
+ MirrorMaker.commitOffsets(mirrorMakerConsumer)
+ assertTrue("Offsets for non-existent topics should be removed",
mirrorMakerConsumer.offsets.isEmpty)
+ }
+
@Test
def testCommaSeparatedRegex(): Unit = {
val topic = "new-topic"