Repository: kafka Updated Branches: refs/heads/trunk 617a91a23 -> 8ed271b82
KAFKA-2718: Prevent temp directory being reused in parallel test runs Use Files.createTempDirectory to avoid reuse, for log directories create a new temp directory as parent Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #583 from rajinisivaram/KAFKA-2718-v2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ed271b8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ed271b8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ed271b8 Branch: refs/heads/trunk Commit: 8ed271b82abd4641b594aff262512bd9eb588263 Parents: 617a91a Author: Rajini Sivaram <[email protected]> Authored: Wed Nov 25 14:20:48 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 25 14:20:48 2015 -0800 ---------------------------------------------------------------------- .../test/scala/other/kafka/StressTestLog.scala | 2 +- .../unit/kafka/log/BrokerCompressionTest.scala | 10 +++----- .../test/scala/unit/kafka/log/CleanerTest.scala | 5 ++-- .../src/test/scala/unit/kafka/log/LogTest.scala | 6 ++--- .../test/scala/unit/kafka/utils/TestUtils.scala | 24 ++++++++++++++------ 5 files changed, 27 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/other/kafka/StressTestLog.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 225d77b..5f0e650 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -32,7 +32,7 @@ object StressTestLog { val running = new AtomicBoolean(true) def main(args: Array[String]) { - val dir = TestUtils.tempDir() + val dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir()) val time = new MockTime val logProprties = new Properties() logProprties.put(LogConfig.SegmentBytesProp, 64*1024*1024: java.lang.Integer) http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala index 6180b87..d0cb4a1 100755 --- a/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala +++ b/core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala @@ -34,18 +34,14 @@ import scala.collection.JavaConversions._ @RunWith(value = classOf[Parameterized]) class BrokerCompressionTest(messageCompression: String, brokerCompression: String) extends JUnitSuite { - var logDir: File = null + val tmpDir = TestUtils.tempDir() + val logDir = TestUtils.randomPartitionLogDir(tmpDir) val time = new MockTime(0) val logConfig = LogConfig() - @Before - def setUp() { - logDir = TestUtils.tempDir() - } - @After def tearDown() { - CoreUtils.rm(logDir) + CoreUtils.rm(tmpDir) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/CleanerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 49869aa..8ab9f91 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -37,7 +37,8 @@ import scala.collection._ */ class CleanerTest extends JUnitSuite { - val dir = TestUtils.tempDir() + val tmpdir = TestUtils.tempDir() + val dir = TestUtils.randomPartitionLogDir(tmpdir) val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer) @@ -48,7 +49,7 @@ class CleanerTest extends JUnitSuite { @After def teardown() { - CoreUtils.rm(dir) + CoreUtils.rm(tmpdir) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 7f0d9d6..f4427b9 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -30,21 +30,21 @@ import kafka.server.KafkaConfig class LogTest extends JUnitSuite { - var logDir: File = null + val tmpDir = TestUtils.tempDir() + val logDir = TestUtils.randomPartitionLogDir(tmpDir) val time = new MockTime(0) var config: KafkaConfig = null val logConfig = LogConfig() @Before def setUp() { - logDir = TestUtils.tempDir() val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1) config = KafkaConfig.fromProps(props) } @After def tearDown() { - CoreUtils.rm(logDir) + CoreUtils.rm(tmpDir) } def createEmptyLogs(dir: File, offsets: Int*) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8ed271b8/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2f734f6..88c91f4 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -19,6 +19,7 @@ package kafka.utils import java.io._ import java.nio._ +import java.nio.file.Files import java.nio.channels._ import java.util.Random import java.util.Properties @@ -35,7 +36,6 @@ import org.apache.kafka.test.TestSslUtils import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.I0Itec.zkclient.{ZkClient, ZkConnection} - import kafka.server._ import kafka.producer._ import kafka.message._ @@ -94,11 +94,9 @@ object TestUtils extends Logging { * Create a temporary relative directory */ def tempRelativeDir(parent: String): File = { - new File(parent).mkdirs() - val attempts = 1000 - val f = Iterator.continually(new File(parent, "kafka-" + random.nextInt(1000000))) - .take(attempts).find(_.mkdir()) - .getOrElse(sys.error(s"Failed to create directory after $attempts attempts")) + val parentFile = new File(parent) + parentFile.mkdirs() + val f = Files.createTempDirectory(parentFile.toPath, "kafka-").toFile f.deleteOnExit() Runtime.getRuntime().addShutdownHook(new Thread() { @@ -106,7 +104,19 @@ object TestUtils extends Logging { CoreUtils.rm(f) } }) - + f + } + + /** + * Create a random log directory in the format <string>-<int> used for Kafka partition logs. + * It is the responsibility of the caller to set up a shutdown hook for deletion of the directory. + */ + def randomPartitionLogDir(parentDir: File): File = { + val attempts = 1000 + val f = Iterator.continually(new File(parentDir, "kafka-" + random.nextInt(1000000))) + .take(attempts).find(_.mkdir()) + .getOrElse(sys.error(s"Failed to create directory after $attempts attempts")) + f.deleteOnExit() f }
