Repository: samza Updated Branches: refs/heads/master 343712e30 -> fd9f0802f
SAMZA-1465: Performance regression for KafkaCheckpointManager Author: Jacob Maes <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Boris Shkolnik <[email protected]>, Fred Ji <[email protected]> Closes #331 from jmakes/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fd9f0802 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fd9f0802 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fd9f0802 Branch: refs/heads/master Commit: fd9f0802f9617b03542ee4f42ab8966e6c6df145 Parents: 343712e Author: Jacob Maes <[email protected]> Authored: Wed Oct 18 15:30:34 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Wed Oct 18 15:30:34 2017 -0700 ---------------------------------------------------------------------- .../kafka/KafkaCheckpointManager.scala | 105 +++++++------------ 1 file changed, 40 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fd9f0802/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 4eb6666..75b4700 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -43,6 +43,9 @@ import scala.collection.mutable * keyed to that taskName. If there is no such message, no checkpoint data * exists. The underlying log has a single partition into which all * checkpoints and TaskName to changelog partition mappings are written. + * + * This class is thread safe for writing but not for reading checkpoints. + * This is currently OK since checkpoints are only read on the main thread. */ class KafkaCheckpointManager( clientId: String, @@ -64,14 +67,14 @@ class KafkaCheckpointManager( checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging { var taskNames = Set[TaskName]() - @volatile var systemProducer: SystemProducer = null + @volatile var systemProducer: SystemProducer = null + var systemConsumer: SystemConsumer = null var taskNamesToOffsets: Map[TaskName, Checkpoint] = null val systemAdmin = getSystemAdmin() val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk) - KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString) info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName)) @@ -103,7 +106,7 @@ class KafkaCheckpointManager( systemProducer.send(taskName.getTaskName, envelope) systemProducer.flush(taskName.getTaskName) // make sure it is written - info("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) ) + debug("Completed writing checkpoint=%s into %s topic for system %s." format(checkpoint, checkpointTopic, systemName) ) loop.done }, @@ -181,73 +184,40 @@ class KafkaCheckpointManager( */ private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean, handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = { - - val UNKNOWN_OFFSET = "-1" - var attempts = 10 - val POLL_TIMEOUT = 1000L + info("Reading from checkpoint system:%s topic:%s" format(systemName, checkpointTopic)) val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0)) - val systemConsumer = getSystemConsumer() - val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0)) - // offsets returned are strings - val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset - val oldestOffset = partitionMetadata.getOldestOffset - systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning - systemConsumer.start() - var msgCount = 0 - try { - val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] - // convert offsets to long - var currentOffset = UNKNOWN_OFFSET.toLong - val newestOffsetLong = newestOffset.toLong - val sspToPoll = Collections.singleton(ssp) - while (currentOffset < newestOffsetLong) { - - val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] = - try { - systemConsumer.poll(sspToPoll, POLL_TIMEOUT) - } catch { - case e: Exception => { - // these exceptions are most likely intermediate - warn("Got %s exception while polling the consumer for checkpoints." format e) - if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e) - attempts -= 1 - emptyEnvelopes - } - } + if (systemConsumer == null) { + val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0)) + val oldestOffset = partitionMetadata.getOldestOffset - val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp) - val messagesNum = if (messages != null) messages.size else 0 - info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s" - format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset)) - if (envelopes.isEmpty || messagesNum <= 0) { - info("Got empty/null list of messages") - } else { - msgCount += messages.size() - // check the key - for (msg: IncomingMessageEnvelope <- messages) { - val key = msg.getKey.asInstanceOf[Array[Byte]] - currentOffset = msg.getOffset().toLong - if (key == null) { - throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key." - format currentOffset) - } + systemConsumer = getSystemConsumer() + systemConsumer.register(ssp, oldestOffset) + systemConsumer.start() + } - val checkpointKey = KafkaCheckpointLogKey.fromBytes(key) + val iterator = new SystemStreamPartitionIterator(systemConsumer, ssp); + var msgCount = 0 + while (iterator.hasNext) { + val msg = iterator.next + msgCount += 1 + + val offset = msg.getOffset + val key = msg.getKey.asInstanceOf[Array[Byte]] + if (key == null) { + throw new KafkaUtilException( + "While reading checkpoint (currentOffset=%s) stream encountered message without key." format offset) + } - if (!shouldHandleEntry(checkpointKey)) { - info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey)) - } else { - // handleEntry requires ByteBuffer - val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]]) - handleEntry(checkpointPayload, checkpointKey) - } - } - } + val checkpointKey = KafkaCheckpointLogKey.fromBytes(key) + + if (!shouldHandleEntry(checkpointKey)) { + info("Skipping checkpoint log entry at offset %s with key %s." format(offset, checkpointKey)) + } else { + val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]]) + handleEntry(checkpointPayload, checkpointKey) } - } finally { - systemConsumer.stop() } info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic)) } @@ -282,12 +252,17 @@ class KafkaCheckpointManager( def stop = { - synchronized ( + synchronized { if (systemProducer != null) { systemProducer.stop systemProducer = null } - ) + + if (systemConsumer != null) { + systemConsumer.stop + systemConsumer = null + } + } }
