Repository: kafka Updated Branches: refs/heads/trunk 72eebad43 -> 836cb1963
KAFKA-3063; LogRecoveryTest causes JVM to exit occasionally Remove deletion of tmp file in `OffsetCheckpoint`'s constructor. This delete causes unintuitive behaviour like `LogRecoveryTest` causing a `System.exit` because the test creates an instance of `OffsetCheckpoint` in order to call `read()` on it (while unexpectedly deleting a file being written by another instance of `OffsetCheckpoint`). Also: * Improve error-handling in `OffsetCheckpoint` * Also include minor performance improvements in `read()` * Minor clean-ups to `ReplicaManager` and `LogRecoveryTest` Author: Ismael Juma <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #759 from ijuma/kafka-3063-log-recovery-test-exits-jvm Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/836cb196 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/836cb196 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/836cb196 Branch: refs/heads/trunk Commit: 836cb1963330a9e342379899e0fe52b72347736e Parents: 72eebad Author: Ismael Juma <[email protected]> Authored: Tue Jan 12 16:16:10 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Jan 12 16:16:10 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/common/utils/Utils.java | 23 ++++++ .../scala/kafka/server/OffsetCheckpoint.scala | 82 ++++++++++---------- .../scala/kafka/server/ReplicaManager.scala | 6 +- .../unit/kafka/server/LogRecoveryTest.scala | 18 ++--- 4 files changed, 78 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index e725722..8df54a4 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -23,6 +23,9 @@ import java.io.StringWriter; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -659,4 +662,24 @@ public class Utils { return cl; } + /** + * Attempts to move source to target atomically and falls back to a non-atomic move if it fails. + * + * @throws IOException if both atomic and non-atomic moves fail + */ + public static void atomicMoveWithFallback(Path source, Path target) throws IOException { + try { + Files.move(source, target, StandardCopyOption.ATOMIC_MOVE); + } catch (IOException outer) { + try { + Files.move(source, target, StandardCopyOption.REPLACE_EXISTING); + log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to " + + outer.getMessage()); + } catch (IOException inner) { + inner.addSuppressed(outer); + throw inner; + } + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/main/scala/kafka/server/OffsetCheckpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index 8c5b054..fe1d823 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -16,95 +16,99 @@ */ package kafka.server +import java.nio.file.Paths +import java.util.regex.Pattern + +import org.apache.kafka.common.utils.Utils + import scala.collection._ import kafka.utils.Logging import kafka.common._ import java.io._ +object OffsetCheckpoint { + private val WhiteSpacesPattern = Pattern.compile("\\s+") + private val CurrentVersion = 0 +} + /** * This class saves out a map of topic/partition=>offsets to a file */ class OffsetCheckpoint(val file: File) extends Logging { + import OffsetCheckpoint._ + private val path = file.toPath.toAbsolutePath + private val tempPath = Paths.get(path.toString + ".tmp") private val lock = new Object() - new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness file.createNewFile() // in case the file doesn't exist def write(offsets: Map[TopicAndPartition, Long]) { lock synchronized { // write to temp file and then swap with the existing file - val temp = new File(file.getAbsolutePath + ".tmp") - - val fileOutputStream = new FileOutputStream(temp) + val fileOutputStream = new FileOutputStream(tempPath.toFile) val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) try { - // write the current version - writer.write(0.toString) + writer.write(CurrentVersion.toString) writer.newLine() - - // write the number of entries + writer.write(offsets.size.toString) writer.newLine() - // write the entries offsets.foreach { case (topicPart, offset) => - writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset)) + writer.write(s"${topicPart.topic} ${topicPart.partition} $offset") writer.newLine() } - - // flush the buffer and then fsync the underlying file + writer.flush() fileOutputStream.getFD().sync() } finally { writer.close() } - - // swap new offset checkpoint file with previous one - if(!temp.renameTo(file)) { - // renameTo() fails on Windows if the destination file exists. - file.delete() - if(!temp.renameTo(file)) - throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) - } + + Utils.atomicMoveWithFallback(tempPath, path) } } def read(): Map[TopicAndPartition, Long] = { + + def malformedLineException(line: String) = + throw new IOException(s"Malformed line in offset checkpoint file: $line'") + lock synchronized { val reader = new BufferedReader(new FileReader(file)) + var line: String = null try { - var line = reader.readLine() - if(line == null) + line = reader.readLine() + if (line == null) return Map.empty val version = line.toInt version match { - case 0 => + case CurrentVersion => line = reader.readLine() - if(line == null) + if (line == null) return Map.empty val expectedSize = line.toInt - var offsets = Map[TopicAndPartition, Long]() + val offsets = mutable.Map[TopicAndPartition, Long]() line = reader.readLine() - while(line != null) { - val pieces = line.split("\\s+") - if(pieces.length != 3) - throw new IOException("Malformed line in offset checkpoint file: '%s'.".format(line)) - - val topic = pieces(0) - val partition = pieces(1).toInt - val offset = pieces(2).toLong - offsets += (TopicAndPartition(topic, partition) -> offset) - line = reader.readLine() + while (line != null) { + WhiteSpacesPattern.split(line) match { + case Array(topic, partition, offset) => + offsets += TopicAndPartition(topic, partition.toInt) -> offset.toLong + line = reader.readLine() + case _ => throw malformedLineException(line) + } } - if(offsets.size != expectedSize) - throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size)) + if (offsets.size != expectedSize) + throw new IOException(s"Expected $expectedSize entries but found only ${offsets.size}") offsets - case _ => + case _ => throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version) } + } catch { + case e: NumberFormatException => malformedLineException(line) } finally { reader.close() } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5b1276e..d1e549d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -869,10 +869,10 @@ class ReplicaManager(val config: KafkaConfig, // Flushes the highwatermark value for all partitions to the highwatermark file def checkpointHighWatermarks() { - val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica} + val replicas = allPartitions.values.flatMap(_.getReplica(config.brokerId)) val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath) - for((dir, reps) <- replicasByDir) { - val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap + for ((dir, reps) <- replicasByDir) { + val hwms = reps.map(r => new TopicAndPartition(r) -> r.highWatermark.messageOffset).toMap try { highWatermarkCheckpoints(dir).write(hwms) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/836cb196/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 7a434aa..d11c40f 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -55,9 +55,9 @@ class LogRecoveryTest extends ZooKeeperTestHarness { val message = "hello" var producer: Producer[Int, String] = null - def hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) - def hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) - var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] + def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) + def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) + var servers = Seq.empty[KafkaServer] // Some tests restart the brokers then produce more data. But since test brokers use random ports, we need // to use a new producer that knows the new ports @@ -81,7 +81,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { servers = List(server1, server2) // create topic with 1 partition, 2 replicas, one on each broker - createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) + createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(0,1)), servers = servers) // create the producer updateProducer() @@ -90,7 +90,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { @After override def tearDown() { producer.close() - for(server <- servers) { + for (server <- servers) { server.shutdown() CoreUtils.rm(server.config.logDirs(0)) } @@ -107,7 +107,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages, "Failed to update high watermark for follower after timeout") - servers.foreach(server => server.replicaManager.checkpointHighWatermarks()) + servers.foreach(_.replicaManager.checkpointHighWatermarks()) val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(numMessages, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) @@ -160,7 +160,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed - servers.foreach(server => server.shutdown()) + servers.foreach(_.shutdown()) assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) } @@ -174,7 +174,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed - servers.foreach(server => server.shutdown()) + servers.foreach(_.shutdown()) val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L) assertEquals(hw, leaderHW) val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L) @@ -224,7 +224,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw, "Failed to update high watermark for follower after timeout") // shutdown the servers to allow the hw to be checkpointed - servers.foreach(server => server.shutdown()) + servers.foreach(_.shutdown()) assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)) assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)) }
