Repository: kafka Updated Branches: refs/heads/trunk ba86f0a25 -> 5c9040745
http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala index 3fd5a53..c31f884 100644 --- a/core/src/test/scala/unit/kafka/log/LogConfigTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogConfigTest.scala @@ -26,22 +26,9 @@ import org.scalatest.junit.JUnit3Suite class LogConfigTest extends JUnit3Suite { @Test - def testFromPropsDefaults() { - val defaults = new Properties() - defaults.put(LogConfig.SegmentBytesProp, "4242") - val props = new Properties(defaults) - - val config = LogConfig.fromProps(props) - - Assert.assertEquals(4242, config.segmentSize) - Assert.assertEquals("LogConfig defaults should be retained", Defaults.MaxMessageSize, config.maxMessageSize) - Assert.assertEquals("producer", config.compressionType) - } - - @Test def testFromPropsEmpty() { val p = new Properties() - val config = LogConfig.fromProps(p) + val config = LogConfig(p) Assert.assertEquals(LogConfig(), config) } @@ -62,7 +49,7 @@ class LogConfigTest extends JUnit3Suite { } }) - val actual = LogConfig.fromProps(expected).toProps + val actual = LogConfig(expected).originals Assert.assertEquals(expected, actual) } @@ -86,7 +73,7 @@ class LogConfigTest extends JUnit3Suite { val props = new Properties props.setProperty(name, value.toString) intercept[ConfigException] { - LogConfig.fromProps(props) + LogConfig(props) } }) } http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 01dfbc4..a13f2be 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -18,6 +18,7 @@ package kafka.log import java.io._ +import java.util.Properties import junit.framework.Assert._ import org.junit.Test import org.scalatest.junit.JUnit3Suite @@ -30,7 +31,11 @@ class LogManagerTest extends JUnit3Suite { val time: MockTime = new MockTime() val maxRollInterval = 100 val maxLogAgeMs = 10*60*60*1000 - val logConfig = LogConfig(segmentSize = 1024, maxIndexSize = 4096, retentionMs = maxLogAgeMs) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) + logProps.put(LogConfig.RetentionMsProp, maxLogAgeMs: java.lang.Integer) + val logConfig = LogConfig(logProps) var logDir: File = null var logManager: LogManager = null val name = "kafka" @@ -113,8 +118,11 @@ class LogManagerTest extends JUnit3Suite { def testCleanupSegmentsToMaintainSize() { val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes logManager.shutdown() + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 10 * setSize: java.lang.Integer) + logProps.put(LogConfig.RetentionBytesProp, 5L * 10L * setSize + 10L: java.lang.Long) + val config = LogConfig.fromProps(logConfig.originals, logProps) - val config = logConfig.copy(segmentSize = 10 * setSize, retentionSize = 5L * 10L * setSize + 10L) logManager = createLogManager() logManager.startup @@ -154,7 +162,10 @@ class LogManagerTest extends JUnit3Suite { @Test def testTimeBasedFlush() { logManager.shutdown() - val config = logConfig.copy(flushMs = 1000) + val logProps = new Properties() + logProps.put(LogConfig.FlushMsProp, 1000: java.lang.Integer) + val config = LogConfig.fromProps(logConfig.originals, logProps) + logManager = createLogManager() logManager.startup val log = logManager.createLog(TopicAndPartition(name, 0), config) http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 8e095d6..a8e57c2 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -18,6 +18,7 @@ package kafka.log import java.io._ +import java.util.Properties import java.util.concurrent.atomic._ import junit.framework.Assert._ import org.scalatest.junit.JUnitSuite @@ -61,9 +62,12 @@ class LogTest extends JUnitSuite { def testTimeBasedLogRoll() { val set = TestUtils.singleMessageSet("test".getBytes()) + val logProps = new Properties() + logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long) + // create a log val log = new Log(logDir, - logConfig.copy(segmentMs = 1 * 60 * 60L), + LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time) @@ -96,9 +100,12 @@ class LogTest extends JUnitSuite { val set = TestUtils.singleMessageSet("test".getBytes()) val maxJitter = 20 * 60L + val logProps = new Properties() + logProps.put(LogConfig.SegmentMsProp, 1 * 60 * 60: java.lang.Long) + logProps.put(LogConfig.SegmentJitterMsProp, maxJitter: java.lang.Long) // create a log val log = new Log(logDir, - logConfig.copy(segmentMs = 1 * 60 * 60L, segmentJitterMs = maxJitter), + LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time) @@ -123,8 +130,10 @@ class LogTest extends JUnitSuite { val msgPerSeg = 10 val segmentSize = msgPerSeg * (setSize - 1) // each segment will be 10 messages + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) // segments expire in size @@ -149,7 +158,9 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) val messages = (0 until 100 by 2).map(id => new Message(id.toString.getBytes)).toArray for(i <- 0 until messages.length) @@ -168,7 +179,9 @@ class LogTest extends JUnitSuite { */ @Test def testAppendAndReadWithNonSequentialOffsets() { - val log = new Log(logDir, logConfig.copy(segmentSize = 71), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 71: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) val messageIds = ((0 until 50) ++ (50 until 200 by 7)).toArray val messages = messageIds.map(id => new Message(id.toString.getBytes)) @@ -191,7 +204,9 @@ class LogTest extends JUnitSuite { */ @Test def testReadAtLogGap() { - val log = new Log(logDir, logConfig.copy(segmentSize = 300), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) // keep appending until we have two segments with only a single message in the second segment while(log.numberOfSegments == 1) @@ -211,7 +226,9 @@ class LogTest extends JUnitSuite { @Test def testReadOutOfRange() { createEmptyLogs(logDir, 1024) - val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes) try { log.read(0, 1024) @@ -234,7 +251,9 @@ class LogTest extends JUnitSuite { @Test def testLogRolls() { /* create a multipart log with 100 messages */ - val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) val numMessages = 100 val messageSets = (0 until numMessages).map(i => TestUtils.singleMessageSet(i.toString.getBytes)) messageSets.foreach(log.append(_)) @@ -263,7 +282,9 @@ class LogTest extends JUnitSuite { @Test def testCompressedMessages() { /* this log should roll after every messageset */ - val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) /* append 2 compressed message sets, each with two messages giving offsets 0, 1, 2, 3 */ log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) @@ -286,7 +307,9 @@ class LogTest extends JUnitSuite { for(messagesToAppend <- List(0, 1, 25)) { logDir.mkdirs() // first test a log segment starting at 0 - val log = new Log(logDir, logConfig.copy(segmentSize = 100), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) for(i <- 0 until messagesToAppend) log.append(TestUtils.singleMessageSet(i.toString.getBytes)) @@ -318,7 +341,9 @@ class LogTest extends JUnitSuite { val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new Message ("You".getBytes), new Message("bethe".getBytes)) // append messages to log val configSegmentSize = messageSet.sizeInBytes - 1 - val log = new Log(logDir, logConfig.copy(segmentSize = configSegmentSize), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, configSegmentSize: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) try { log.append(messageSet) @@ -342,7 +367,10 @@ class LogTest extends JUnitSuite { val messageSetWithKeyedMessage = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage) val messageSetWithKeyedMessages = new ByteBufferMessageSet(NoCompressionCodec, keyedMessage, anotherKeyedMessage) - val log = new Log(logDir, logConfig.copy(compact = true), recoveryPoint = 0L, time.scheduler, time) + val logProps = new Properties() + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time) try { log.append(messageSetWithUnkeyedMessage) @@ -380,7 +408,9 @@ class LogTest extends JUnitSuite { // append messages to log val maxMessageSize = second.sizeInBytes - 1 - val log = new Log(logDir, logConfig.copy(maxMessageSize = maxMessageSize), recoveryPoint = 0L, time.scheduler, time = time) + val logProps = new Properties() + logProps.put(LogConfig.MaxMessageBytesProp, maxMessageSize: java.lang.Integer) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time = time) // should be able to append the small message log.append(first) @@ -401,7 +431,11 @@ class LogTest extends JUnitSuite { val messageSize = 100 val segmentSize = 7 * messageSize val indexInterval = 3 * messageSize - val config = logConfig.copy(segmentSize = segmentSize, indexInterval = indexInterval, maxIndexSize = 4096) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, indexInterval: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 4096: java.lang.Integer) + val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize))) @@ -432,7 +466,11 @@ class LogTest extends JUnitSuite { def testIndexRebuild() { // publish the messages and close the log val numMessages = 200 - val config = logConfig.copy(segmentSize = 200, indexInterval = 1) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 200: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + + val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time) for(i <- 0 until numMessages) log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10))) @@ -460,8 +498,11 @@ class LogTest extends JUnitSuite { val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) + // create a log - val log = new Log(logDir, logConfig.copy(segmentSize = segmentSize), recoveryPoint = 0L, scheduler = time.scheduler, time = time) + val log = new Log(logDir, LogConfig(logProps), recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -513,7 +554,9 @@ class LogTest extends JUnitSuite { val setSize = set.sizeInBytes val msgPerSeg = 10 val segmentSize = msgPerSeg * setSize // each segment will be 10 messages - val config = logConfig.copy(segmentSize = segmentSize) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer) + val config = LogConfig(logProps) val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time) assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments) for (i<- 1 to msgPerSeg) @@ -540,10 +583,12 @@ class LogTest extends JUnitSuite { val bogusIndex2 = Log.indexFilename(logDir, 5) val set = TestUtils.singleMessageSet("test".getBytes()) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) val log = new Log(logDir, - logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, - indexInterval = 1), + LogConfig(logProps), recoveryPoint = 0L, time.scheduler, time) @@ -564,9 +609,11 @@ class LogTest extends JUnitSuite { @Test def testReopenThenTruncate() { val set = TestUtils.singleMessageSet("test".getBytes()) - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - maxIndexSize = 1000, - indexInterval = 10000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer) + val config = LogConfig(logProps) // create a log var log = new Log(logDir, @@ -596,10 +643,13 @@ class LogTest extends JUnitSuite { def testAsyncDelete() { val set = TestUtils.singleMessageSet("test".getBytes()) val asyncDeleteMs = 1000 - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, - fileDeleteDelayMs = asyncDeleteMs, - maxIndexSize = 1000, - indexInterval = 10000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 10000: java.lang.Integer) + logProps.put(LogConfig.FileDeleteDelayMsProp, asyncDeleteMs: java.lang.Integer) + val config = LogConfig(logProps) + val log = new Log(logDir, config, recoveryPoint = 0L, @@ -634,7 +684,10 @@ class LogTest extends JUnitSuite { @Test def testOpenDeletesObsoleteFiles() { val set = TestUtils.singleMessageSet("test".getBytes()) - val config = logConfig.copy(segmentSize = set.sizeInBytes * 5, maxIndexSize = 1000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, set.sizeInBytes * 5: java.lang.Integer) + logProps.put(LogConfig.SegmentIndexBytesProp, 1000: java.lang.Integer) + val config = LogConfig(logProps) var log = new Log(logDir, config, recoveryPoint = 0L, @@ -672,7 +725,11 @@ class LogTest extends JUnitSuite { @Test def testCorruptLog() { // append some messages to create some segments - val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer) + val config = LogConfig(logProps) val set = TestUtils.singleMessageSet("test".getBytes()) val recoveryPoint = 50L for(iteration <- 0 until 50) { @@ -704,7 +761,11 @@ class LogTest extends JUnitSuite { @Test def testCleanShutdownFile() { // append some messages to create some segments - val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000) + val logProps = new Properties() + logProps.put(LogConfig.SegmentBytesProp, 1000: java.lang.Integer) + logProps.put(LogConfig.MaxMessageBytesProp, 64*1024: java.lang.Integer) + logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer) + val config = LogConfig(logProps) val set = TestUtils.singleMessageSet("test".getBytes()) val parentLogDir = logDir.getParentFile assertTrue("Data directory %s must exist", parentLogDir.isDirectory) http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 7877f6c..8a871cf 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -16,6 +16,8 @@ */ package kafka.server +import java.util.Properties + import junit.framework.Assert._ import org.junit.Test import kafka.integration.KafkaServerTestHarness @@ -30,16 +32,19 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testConfigChange() { - val oldVal = 100000 - val newVal = 200000 + val oldVal: java.lang.Long = 100000 + val newVal: java.lang.Long = 200000 val tp = TopicAndPartition("test", 0) - AdminUtils.createTopic(zkClient, tp.topic, 1, 1, LogConfig(flushInterval = oldVal).toProps) + val logProps = new Properties() + logProps.put(LogConfig.FlushMessagesProp, oldVal.toString) + AdminUtils.createTopic(zkClient, tp.topic, 1, 1, logProps) TestUtils.retry(10000) { val logOpt = this.servers(0).logManager.getLog(tp) assertTrue(logOpt.isDefined) assertEquals(oldVal, logOpt.get.config.flushInterval) } - AdminUtils.changeTopicConfig(zkClient, tp.topic, LogConfig(flushInterval = newVal).toProps) + logProps.put(LogConfig.FlushMessagesProp, newVal.toString) + AdminUtils.changeTopicConfig(zkClient, tp.topic, logProps) TestUtils.retry(10000) { assertEquals(newVal, this.servers(0).logManager.getLog(tp).get.config.flushInterval) } @@ -49,7 +54,9 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness { def testConfigChangeOnNonExistingTopic() { val topic = TestUtils.tempTopic try { - AdminUtils.changeTopicConfig(zkClient, topic, LogConfig(flushInterval = 10000).toProps) + val logProps = new Properties() + logProps.put(LogConfig.FlushMessagesProp, 10000: java.lang.Integer) + AdminUtils.changeTopicConfig(zkClient, topic, logProps) fail("Should fail with AdminOperationException for topic doesn't exist") } catch { case e: AdminOperationException => // expected http://git-wip-us.apache.org/repos/asf/kafka/blob/5c904074/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index c487f36..8268852 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -30,29 +30,13 @@ import scala.util.Random._ class KafkaConfigConfigDefTest extends JUnit3Suite { @Test - def testFromPropsDefaults() { - val defaults = new Properties() - defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") - - // some ordinary setting - defaults.put(KafkaConfig.AdvertisedPortProp, "1818") - - val props = new Properties(defaults) - - val config = KafkaConfig.fromProps(props) - - Assert.assertEquals(1818, config.advertisedPort) - Assert.assertEquals("KafkaConfig defaults should be retained", Defaults.ConnectionsMaxIdleMs, config.connectionsMaxIdleMs) - } - - @Test def testFromPropsEmpty() { // only required val p = new Properties() p.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181") val actualConfig = KafkaConfig.fromProps(p) - val expectedConfig = new KafkaConfig(zkConnect = "127.0.0.1:2181") + val expectedConfig = new KafkaConfig(p) Assert.assertEquals(expectedConfig.zkConnect, actualConfig.zkConnect) Assert.assertEquals(expectedConfig.zkSessionTimeoutMs, actualConfig.zkSessionTimeoutMs) @@ -252,7 +236,7 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { } }) - val actual = KafkaConfig.fromProps(expected).toProps + val actual = KafkaConfig.fromProps(expected).originals Assert.assertEquals(expected, actual) }