http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java index 6482529..551d820 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java @@ -35,18 +35,22 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(value = Parameterized.class) public class RecordTest { + private byte magic; private long timestamp; private ByteBuffer key; private ByteBuffer value; private CompressionType compression; + private TimestampType timestampType; private Record record; - public RecordTest(long timestamp, byte[] key, byte[] value, CompressionType compression) { + public RecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) { + this.magic = magic; this.timestamp = timestamp; + this.timestampType = TimestampType.CREATE_TIME; this.key = key == null ? null : ByteBuffer.wrap(key); this.value = value == null ? null : ByteBuffer.wrap(value); this.compression = compression; - this.record = new Record(timestamp, key, value, compression); + this.record = Record.create(magic, timestamp, key, value, compression, timestampType); } @Test @@ -56,22 +60,33 @@ public class RecordTest { assertEquals(key, record.key()); if (key != null) assertEquals(key.limit(), record.keySize()); - assertEquals(Record.CURRENT_MAGIC_VALUE, record.magic()); + assertEquals(magic, record.magic()); assertEquals(value, record.value()); if (value != null) assertEquals(value.limit(), record.valueSize()); + if (magic > 0) { + assertEquals(timestamp, record.timestamp()); + assertEquals(timestampType, record.timestampType()); + } else { + assertEquals(Record.NO_TIMESTAMP, record.timestamp()); + assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType()); + } } @Test public void testChecksum() { assertEquals(record.checksum(), record.computeChecksum()); + + byte attributes = Record.computeAttributes(magic, this.compression, TimestampType.CREATE_TIME); assertEquals(record.checksum(), Record.computeChecksum( - this.timestamp, - this.key == null ? null : this.key.array(), - this.value == null ? null : this.value.array(), - this.compression, 0, -1)); + magic, + attributes, + this.timestamp, + this.key == null ? null : this.key.array(), + this.value == null ? null : this.value.array() + )); assertTrue(record.isValid()); - for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.size(); i++) { + for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.sizeInBytes(); i++) { Record copy = copyOf(record); copy.buffer().put(i, (byte) 69); assertFalse(copy.isValid()); @@ -85,7 +100,7 @@ public class RecordTest { } private Record copyOf(Record record) { - ByteBuffer buffer = ByteBuffer.allocate(record.size()); + ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes()); record.buffer().put(buffer); buffer.rewind(); record.buffer().rewind(); @@ -101,12 +116,13 @@ public class RecordTest { public static Collection<Object[]> data() { byte[] payload = new byte[1000]; Arrays.fill(payload, (byte) 1); - List<Object[]> values = new ArrayList<Object[]>(); - for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L)) - for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload)) - for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload)) - for (CompressionType compression : CompressionType.values()) - values.add(new Object[] {timestamp, key, value, compression}); + List<Object[]> values = new ArrayList<>(); + for (byte magic : Arrays.asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1)) + for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L)) + for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload)) + for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload)) + for (CompressionType compression : CompressionType.values()) + values.add(new Object[] {magic, timestamp, key, value, compression}); return values; }
http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java index aabadfe..427c743 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java @@ -20,35 +20,29 @@ import org.junit.Test; import java.nio.ByteBuffer; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class SimpleRecordTest { /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */ - @Test + @Test(expected = InvalidRecordException.class) public void testIsValidWithTooSmallBuffer() { ByteBuffer buffer = ByteBuffer.allocate(2); Record record = new Record(buffer); assertFalse(record.isValid()); - try { - record.ensureValid(); - fail("InvalidRecordException should have been thrown"); - } catch (InvalidRecordException e) { } + record.ensureValid(); } - @Test + @Test(expected = InvalidRecordException.class) public void testIsValidWithChecksumMismatch() { ByteBuffer buffer = ByteBuffer.allocate(4); // set checksum buffer.putInt(2); Record record = new Record(buffer); assertFalse(record.isValid()); - try { - record.ensureValid(); - fail("InvalidRecordException should have been thrown"); - } catch (InvalidRecordException e) { } + record.ensureValid(); } @Test @@ -63,4 +57,40 @@ public class SimpleRecordTest { record.ensureValid(); } + @Test + public void testConvertFromV0ToV1() { + byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()}; + byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null}; + + for (int i = 0; i < keys.length; i++) { + Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, keys[i], values[i]); + Record converted = record.convert(Record.MAGIC_VALUE_V1); + + assertEquals(Record.MAGIC_VALUE_V1, converted.magic()); + assertEquals(Record.NO_TIMESTAMP, converted.timestamp()); + assertEquals(record.key(), converted.key()); + assertEquals(record.value(), converted.value()); + assertTrue(record.isValid()); + assertEquals(record.convertedSize(Record.MAGIC_VALUE_V1), converted.sizeInBytes()); + } + } + + @Test + public void testConvertFromV1ToV0() { + byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()}; + byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null}; + + for (int i = 0; i < keys.length; i++) { + Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(), keys[i], values[i]); + Record converted = record.convert(Record.MAGIC_VALUE_V0); + + assertEquals(Record.MAGIC_VALUE_V0, converted.magic()); + assertEquals(Record.NO_TIMESTAMP, converted.timestamp()); + assertEquals(record.key(), converted.key()); + assertEquals(record.value(), converted.value()); + assertTrue(record.isValid()); + assertEquals(record.convertedSize(Record.MAGIC_VALUE_V0), converted.sizeInBytes()); + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java new file mode 100644 index 0000000..4759715 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TimestampTypeTest { + + @Test + public void toAndFromAttributesCreateTime() { + byte attributes = TimestampType.CREATE_TIME.updateAttributes((byte) 0); + assertEquals(TimestampType.CREATE_TIME, TimestampType.forAttributes(attributes)); + } + + @Test + public void toAndFromAttributesLogAppendTime() { + byte attributes = TimestampType.LOG_APPEND_TIME.updateAttributes((byte) 0); + assertEquals(TimestampType.LOG_APPEND_TIME, TimestampType.forAttributes(attributes)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index d3280e5..4e80b61 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -23,8 +23,10 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Utils; import javax.xml.bind.DatatypeConverter; @@ -35,6 +37,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -46,6 +49,7 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -185,13 +189,13 @@ public class TestUtils { public static ByteBuffer partitionRecordsBuffer(final long offset, final CompressionType compressionType, final Record... records) { int bufferSize = 0; for (final Record record : records) - bufferSize += Records.LOG_OVERHEAD + record.size(); + bufferSize += Records.LOG_OVERHEAD + record.sizeInBytes(); final ByteBuffer buffer = ByteBuffer.allocate(bufferSize); - final MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, compressionType, TimestampType.CREATE_TIME); + long nextOffset = offset; for (final Record record : records) - memoryRecords.append(offset, record); - memoryRecords.close(); - return memoryRecords.buffer(); + builder.append(nextOffset++, record); + return builder.build().buffer(); } public static Properties producerConfig(final String bootstrapServers, @@ -309,4 +313,22 @@ public class TestUtils { fail(clusterId + " cannot be converted back to UUID."); } } + + /** + * Throw an exception if the two iterators are of differing lengths or contain + * different messages on their Nth element + */ + public static <T> void checkEquals(Iterator<T> s1, Iterator<T> s2) { + while (s1.hasNext() && s2.hasNext()) + assertEquals(s1.next(), s2.next()); + assertFalse("Iterators have uneven length--first has more", s1.hasNext()); + assertFalse("Iterators have uneven length--second has more", s2.hasNext()); + } + + public static <T> List<T> toList(Iterator<T> iterator) { + List<T> res = new ArrayList<>(); + while (iterator.hasNext()) + res.add(iterator.next()); + return res; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/api/ApiVersion.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index 895c1b1..4052639 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -17,7 +17,7 @@ package kafka.api -import kafka.message.Message +import org.apache.kafka.common.record.Record /** * This class contains the different Kafka versions. @@ -87,54 +87,54 @@ sealed trait ApiVersion extends Ordered[ApiVersion] { // Keep the IDs in order of versions case object KAFKA_0_8_0 extends ApiVersion { val version: String = "0.8.0.X" - val messageFormatVersion: Byte = Message.MagicValue_V0 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0 val id: Int = 0 } case object KAFKA_0_8_1 extends ApiVersion { val version: String = "0.8.1.X" - val messageFormatVersion: Byte = Message.MagicValue_V0 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0 val id: Int = 1 } case object KAFKA_0_8_2 extends ApiVersion { val version: String = "0.8.2.X" - val messageFormatVersion: Byte = Message.MagicValue_V0 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0 val id: Int = 2 } case object KAFKA_0_9_0 extends ApiVersion { val version: String = "0.9.0.X" - val messageFormatVersion: Byte = Message.MagicValue_V0 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0 val id: Int = 3 } case object KAFKA_0_10_0_IV0 extends ApiVersion { val version: String = "0.10.0-IV0" - val messageFormatVersion: Byte = Message.MagicValue_V1 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1 val id: Int = 4 } case object KAFKA_0_10_0_IV1 extends ApiVersion { val version: String = "0.10.0-IV1" - val messageFormatVersion: Byte = Message.MagicValue_V1 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1 val id: Int = 5 } case object KAFKA_0_10_1_IV0 extends ApiVersion { val version: String = "0.10.1-IV0" - val messageFormatVersion: Byte = Message.MagicValue_V1 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1 val id: Int = 6 } case object KAFKA_0_10_1_IV1 extends ApiVersion { val version: String = "0.10.1-IV1" - val messageFormatVersion: Byte = Message.MagicValue_V1 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1 val id: Int = 7 } case object KAFKA_0_10_1_IV2 extends ApiVersion { val version: String = "0.10.1-IV2" - val messageFormatVersion: Byte = Message.MagicValue_V1 + val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1 val id: Int = 8 } http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 7e52a91..9eb92cd 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -25,7 +25,6 @@ import kafka.log.LogConfig import kafka.server._ import kafka.metrics.KafkaMetricsGroup import kafka.controller.KafkaController -import kafka.message.ByteBufferMessageSet import java.io.IOException import java.util.concurrent.locks.ReentrantReadWriteLock @@ -34,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors import scala.collection.JavaConverters._ import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.PartitionState import org.apache.kafka.common.utils.Time @@ -190,7 +190,7 @@ class Partition(val topic: String, allReplicas.foreach(replica => getOrCreateReplica(replica)) val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet // remove assigned replicas that have been removed by the controller - (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_)) + (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica) inSyncReplicas = newInSyncReplicas leaderEpoch = partitionStateInfo.leaderEpoch zkVersion = partitionStateInfo.zkVersion @@ -440,7 +440,7 @@ class Partition(val topic: String, laggingReplicas } - def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int = 0) = { + def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = { val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { val leaderReplicaOpt = leaderReplicaIfLocal() leaderReplicaOpt match { @@ -455,7 +455,7 @@ class Partition(val topic: String, .format(topic, partitionId, inSyncSize, minIsr)) } - val info = log.append(messages, assignOffsets = true) + val info = log.append(records, assignOffsets = true) // probably unblock some follower fetch requests since log end offset has been updated replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 @@ -480,7 +480,7 @@ class Partition(val topic: String, newLeaderAndIsr, controllerEpoch, zkVersion) if(updateSucceeded) { - replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId)) + replicaManager.recordIsrChange(TopicAndPartition(topic, partitionId)) inSyncReplicas = newIsr zkVersion = newVersion trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion)) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index c47efb7..f702b9d 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -17,14 +17,16 @@ package kafka.consumer -import kafka.api.{OffsetRequest, Request, FetchRequestBuilder, FetchResponsePartitionData} +import kafka.api.{FetchRequestBuilder, FetchResponsePartitionData, OffsetRequest, Request} import kafka.cluster.BrokerEndPoint import kafka.message.ByteBufferMessageSet -import kafka.server.{PartitionFetchState, AbstractFetcherThread} +import kafka.server.{AbstractFetcherThread, PartitionFetchState} import kafka.common.{ErrorMapping, TopicAndPartition} + import scala.collection.Map import ConsumerFetcherThread._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.record.MemoryRecords class ConsumerFetcherThread(name: String, val config: ConsumerConfig, @@ -81,7 +83,7 @@ class ConsumerFetcherThread(name: String, case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime case _ => OffsetRequest.LatestTime } - val topicAndPartition = new TopicAndPartition(topicPartition.topic, topicPartition.partition) + val topicAndPartition = TopicAndPartition(topicPartition.topic, topicPartition.partition) val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId) val pti = partitionMap(topicPartition) pti.resetFetchOffset(newOffset) @@ -123,7 +125,7 @@ object ConsumerFetcherThread { class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData { def errorCode: Short = underlying.error - def toByteBufferMessageSet: ByteBufferMessageSet = underlying.messages.asInstanceOf[ByteBufferMessageSet] + def toRecords: MemoryRecords = underlying.messages.asInstanceOf[ByteBufferMessageSet].asRecords def highWatermark: Long = underlying.hw def exception: Option[Throwable] = if (errorCode == ErrorMapping.NoError) None else Some(ErrorMapping.exceptionFor(errorCode)) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 0c53345..db40482 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -458,14 +458,14 @@ class GroupCoordinator(val brokerId: Int, def handleFetchOffsets(groupId: String, partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = { if (!isActive.get) { - partitions.map { case topicPartition => + partitions.map { topicPartition => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap } else if (!isCoordinatorForGroup(groupId)) { debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId)) - partitions.map { case topicPartition => + partitions.map { topicPartition => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap } else if (isCoordinatorLoadingInProgress(groupId)) { - partitions.map { case topicPartition => + partitions.map { topicPartition => (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap } else { // return offsets blindly regardless the current group state since the group may be using http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index e55bcaa..a97b527 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -17,38 +17,31 @@ package kafka.coordinator -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} -import org.apache.kafka.common.protocol.types.Type.STRING -import org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING -import org.apache.kafka.common.protocol.types.Type.INT32 -import org.apache.kafka.common.protocol.types.Type.INT64 -import org.apache.kafka.common.protocol.types.Type.BYTES -import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse -import org.apache.kafka.common.requests.OffsetFetchResponse -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.utils.Time -import org.apache.kafka.clients.consumer.ConsumerRecord -import kafka.utils._ -import kafka.common._ -import kafka.message._ -import kafka.log.FileMessageSet -import kafka.metrics.KafkaMetricsGroup -import kafka.common.TopicAndPartition -import kafka.common.MessageFormatter -import kafka.server.ReplicaManager - -import scala.collection._ import java.io.PrintStream import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0} +import kafka.common.{MessageFormatter, TopicAndPartition, _} +import kafka.metrics.KafkaMetricsGroup +import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock +import kafka.utils._ +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.types.Type._ +import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} +import org.apache.kafka.common.record._ +import org.apache.kafka.common.requests.OffsetFetchResponse +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.utils.{Time, Utils} + +import scala.collection.JavaConverters._ +import scala.collection._ class GroupMetadataManager(val brokerId: Int, val interBrokerProtocolVersion: ApiVersion, @@ -57,6 +50,8 @@ class GroupMetadataManager(val brokerId: Int, zkUtils: ZkUtils, time: Time) extends Logging with KafkaMetricsGroup { + private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec) + private val groupMetadataCache = new Pool[String, GroupMetadata] /* lock protecting access to loading and owned partition sets */ @@ -135,13 +130,11 @@ class GroupMetadataManager(val brokerId: Int, } } - def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Errors => Unit): Option[DelayedStore] = { - val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId)) - magicValueAndTimestampOpt match { - case Some((magicValue, timestamp)) => + getMagicAndTimestamp(partitionFor(group.groupId)) match { + case Some((magicValue, timestampType, timestamp)) => val groupMetadataValueVersion = { if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0) 0.toShort @@ -149,17 +142,12 @@ class GroupMetadataManager(val brokerId: Int, GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION } - val message = new Message( - key = GroupMetadataManager.groupMetadataKey(group.groupId), - bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion), - timestamp = timestamp, - magicValue = magicValue) + val record = Record.create(magicValue, timestampType, timestamp, + GroupMetadataManager.groupMetadataKey(group.groupId), + GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion)) val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId)) - - val groupMetadataMessageSet = Map(groupMetadataPartition -> - new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message)) - + val groupMetadataRecords = Map(groupMetadataPartition -> MemoryRecords.withRecords(timestampType, compressionType, record)) val generationId = group.generationId // set the callback function to insert the created group into cache after log append completed @@ -212,7 +200,7 @@ class GroupMetadataManager(val brokerId: Int, responseCallback(responseError) } - Some(DelayedStore(groupMetadataMessageSet, putCacheCallback)) + Some(DelayedStore(groupMetadataRecords, putCacheCallback)) case None => responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP) @@ -222,11 +210,11 @@ class GroupMetadataManager(val brokerId: Int, def store(delayedStore: DelayedStore) { // call replica manager to append the group message - replicaManager.appendMessages( + replicaManager.appendRecords( config.offsetCommitTimeoutMs.toLong, config.offsetCommitRequiredAcks, true, // allow appending to internal offset topic - delayedStore.messageSet, + delayedStore.partitionRecords, delayedStore.callback) } @@ -244,22 +232,17 @@ class GroupMetadataManager(val brokerId: Int, } // construct the message set to append - val magicValueAndTimestampOpt = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId)) - magicValueAndTimestampOpt match { - case Some((magicValue, timestamp)) => - val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => - new Message( - key = GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition), - bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata), - timestamp = timestamp, - magicValue = magicValue - ) + getMagicAndTimestamp(partitionFor(group.groupId)) match { + case Some((magicValue, timestampType, timestamp)) => + val records = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + Record.create(magicValue, timestampType, timestamp, + GroupMetadataManager.offsetCommitKey(group.groupId, topicAndPartition.topic, topicAndPartition.partition), + GroupMetadataManager.offsetCommitValue(offsetAndMetadata)) }.toSeq val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId)) - val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> - new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) + val entries = Map(offsetTopicPartition -> MemoryRecords.withRecords(timestampType, compressionType, records:_*)) // set the callback function to insert offsets into cache after log append completed def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { @@ -330,7 +313,7 @@ class GroupMetadataManager(val brokerId: Int, group.prepareOffsetCommit(offsetMetadata) } - Some(DelayedStore(offsetsAndMetadataMessageSet, putCacheCallback)) + Some(DelayedStore(entries, putCacheCallback)) case None => val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => @@ -412,28 +395,30 @@ class GroupMetadataManager(val brokerId: Int, while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { buffer.clear() - val messages = log.read(currOffset, config.loadBufferSize, minOneMessage = true).messageSet.asInstanceOf[FileMessageSet] - messages.readInto(buffer, 0) - val messageSet = new ByteBufferMessageSet(buffer) - messageSet.foreach { msgAndOffset => - require(msgAndOffset.message.key != null, "Offset entry key should not be null") - val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key) + val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage = true).records.asInstanceOf[FileRecords] + fileRecords.readInto(buffer, 0) + + MemoryRecords.readableRecords(buffer).deepIterator.asScala.foreach { entry => + val record = entry.record + + require(record.hasKey, "Offset entry key should not be null") + val baseKey = GroupMetadataManager.readMessageKey(record.key) if (baseKey.isInstanceOf[OffsetKey]) { // load offset val key = baseKey.key.asInstanceOf[GroupTopicPartition] - if (msgAndOffset.message.payload == null) { + if (record.hasNullValue) { loadedOffsets.remove(key) removedOffsets.add(key) } else { - val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload) + val value = GroupMetadataManager.readOffsetMessageValue(record.value) loadedOffsets.put(key, value) removedOffsets.remove(key) } } else { // load group metadata val groupId = baseKey.key.asInstanceOf[String] - val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload) + val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) if (groupMetadata != null) { trace(s"Loaded group metadata for group ${groupMetadata.groupId} with generation ${groupMetadata.generationId}") removedGroups.remove(groupId) @@ -444,7 +429,7 @@ class GroupMetadataManager(val brokerId: Int, } } - currOffset = msgAndOffset.nextOffset + currOffset = entry.nextOffset } } @@ -467,8 +452,8 @@ class GroupMetadataManager(val brokerId: Int, removedGroups.foreach { groupId => if (groupMetadataCache.contains(groupId)) - throw new IllegalStateException(s"Unexpected unload of active group ${groupId} while " + - s"loading partition ${topicPartition}") + throw new IllegalStateException(s"Unexpected unload of active group $groupId while " + + s"loading partition $topicPartition") } if (!shuttingDown.get()) @@ -572,15 +557,15 @@ class GroupMetadataManager(val brokerId: Int, } val offsetsPartition = partitionFor(groupId) - getMessageFormatVersionAndTimestamp(offsetsPartition) match { - case Some((magicValue, timestamp)) => + getMagicAndTimestamp(offsetsPartition) match { + case Some((magicValue, timestampType, timestamp)) => val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition) partitionOpt.foreach { partition => val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition) val tombstones = expiredOffsets.map { case (topicPartition, offsetAndMetadata) => trace(s"Removing expired offset and metadata for $groupId, $topicPartition: $offsetAndMetadata") val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic, topicPartition.partition) - new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue) + Record.create(magicValue, timestampType, timestamp, commitKey, null) }.toBuffer trace(s"Marked ${expiredOffsets.size} offsets in $appendPartition for deletion.") @@ -590,8 +575,7 @@ class GroupMetadataManager(val brokerId: Int, // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and // retry removing this group. - tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId), - timestamp = timestamp, magicValue = magicValue) + tombstones += Record.create(magicValue, timestampType, timestamp, GroupMetadataManager.groupMetadataKey(group.groupId), null) trace(s"Group $groupId removed from the metadata cache and marked for deletion in $appendPartition.") } @@ -599,7 +583,7 @@ class GroupMetadataManager(val brokerId: Int, try { // do not need to require acks since even if the tombstone is lost, // it will be appended again in the next purge cycle - partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*)) + partition.appendRecordsToLeader(MemoryRecords.withRecords(timestampType, compressionType, tombstones: _*)) offsetsRemoved += expiredOffsets.size trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired offsets and/or metadata for group $groupId") } catch { @@ -663,16 +647,11 @@ class GroupMetadataManager(val brokerId: Int, * @param partition Partition of GroupMetadataTopic * @return Option[(MessageFormatVersion, TimeStamp)] if replica is local, None otherwise */ - private def getMessageFormatVersionAndTimestamp(partition: Int): Option[(Byte, Long)] = { + private def getMagicAndTimestamp(partition: Int): Option[(Byte, TimestampType, Long)] = { val groupMetadataTopicAndPartition = TopicAndPartition(Topic.GroupMetadataTopicName, partition) - replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).map { messageFormatVersion => - val timestamp = { - if (messageFormatVersion == Message.MagicValue_V0) - Message.NoTimestamp - else - time.milliseconds() - } - (messageFormatVersion, timestamp) + replicaManager.getMagicAndTimestampType(groupMetadataTopicAndPartition).map { case (messageFormatVersion, timestampType) => + val timestamp = if (messageFormatVersion == Record.MAGIC_VALUE_V0) Record.NO_TIMESTAMP else time.milliseconds() + (messageFormatVersion, timestampType, timestamp) } } @@ -964,7 +943,7 @@ object GroupMetadataManager { * @return an offset-metadata object from the message */ def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = { - if(buffer == null) { // tombstone + if (buffer == null) { // tombstone null } else { val version = buffer.getShort @@ -997,7 +976,7 @@ object GroupMetadataManager { * @return a group metadata object from the message */ def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = { - if(buffer == null) { // tombstone + if (buffer == null) { // tombstone null } else { val version = buffer.getShort @@ -1016,23 +995,22 @@ object GroupMetadataManager { group.leaderId = value.get(LEADER_KEY).asInstanceOf[String] group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String] - memberMetadataArray.foreach { - case memberMetadataObj => - val memberMetadata = memberMetadataObj.asInstanceOf[Struct] - val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String] - val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String] - val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String] - val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int] - val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int] + memberMetadataArray.foreach { memberMetadataObj => + val memberMetadata = memberMetadataObj.asInstanceOf[Struct] + val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String] + val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String] + val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String] + val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int] + val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int] - val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer]) + val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer]) - val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, - protocolType, List((group.protocol, subscription))) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, + protocolType, List((group.protocol, subscription))) - member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer]) + member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer]) - group.add(memberId, member) + group.add(memberId, member) } group @@ -1087,7 +1065,7 @@ object GroupMetadataManager { } -case class DelayedStore(messageSet: Map[TopicPartition, MessageSet], +case class DelayedStore(partitionRecords: Map[TopicPartition, MemoryRecords], callback: Map[TopicPartition, PartitionResponse] => Unit) case class GroupTopicPartition(group: String, topicPartition: TopicPartition) { http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/FileMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala deleted file mode 100755 index 506f5b9..0000000 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ /dev/null @@ -1,445 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log - -import java.io._ -import java.nio._ -import java.nio.channels._ -import java.util.concurrent.atomic._ -import java.util.concurrent.TimeUnit - -import kafka.utils._ -import kafka.message._ -import kafka.common.KafkaException -import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} -import org.apache.kafka.common.errors.CorruptRecordException -import org.apache.kafka.common.record.FileRecords -import org.apache.kafka.common.utils.Utils - -import scala.collection.mutable.ArrayBuffer - -/** - * An on-disk message set. An optional start and end position can be applied to the message set - * which will allow slicing a subset of the file. - * @param file The file name for the underlying log data - * @param channel the underlying file channel used - * @param start A lower bound on the absolute position in the file from which the message set begins - * @param end The upper bound on the absolute position in the file at which the message set ends - * @param isSlice Should the start and end parameters be used for slicing? - */ -@nonthreadsafe -class FileMessageSet private[kafka](@volatile var file: File, - private[log] val channel: FileChannel, - private[log] val start: Int, - private[log] val end: Int, - isSlice: Boolean) extends MessageSet { - /* the size of the message set in bytes */ - private val _size = - if(isSlice) - new AtomicInteger(end - start) // don't check the file size if this is just a slice view - else - new AtomicInteger(math.min(channel.size.toInt, end) - start) - - /* if this is not a slice, update the file pointer to the end of the file */ - if (!isSlice) - /* set the file position to the last byte in the file */ - channel.position(math.min(channel.size.toInt, end)) - - /** - * Create a file message set with no slicing. - */ - def this(file: File, channel: FileChannel) = - this(file, channel, start = 0, end = Int.MaxValue, isSlice = false) - - /** - * Create a file message set with no slicing - */ - def this(file: File) = - this(file, FileMessageSet.openChannel(file, mutable = true)) - - /** - * Create a file message set with no slicing, and with initFileSize and preallocate. - * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize - * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance. - * If it's new file and preallocate is true, end will be set to 0. Otherwise set to Int.MaxValue. - */ - def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) = - this(file, - channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate), - start = 0, - end = if (!fileAlreadyExists && preallocate) 0 else Int.MaxValue, - isSlice = false) - - /** - * Create a file message set with mutable option - */ - def this(file: File, mutable: Boolean) = this(file, FileMessageSet.openChannel(file, mutable)) - - /** - * Create a slice view of the file message set that begins and ends at the given byte offsets - */ - def this(file: File, channel: FileChannel, start: Int, end: Int) = - this(file, channel, start, end, isSlice = true) - - /** - * Return a message set which is a view into this set starting from the given position and with the given size limit. - * - * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read. - * - * If this message set is already sliced, the position will be taken relative to that slicing. - * - * @param position The start position to begin the read from - * @param size The number of bytes after the start position to include - * - * @return A sliced wrapper on this message set limited based on the given position and size - */ - def read(position: Int, size: Int): FileMessageSet = { - if(position < 0) - throw new IllegalArgumentException("Invalid position: " + position) - if(size < 0) - throw new IllegalArgumentException("Invalid size: " + size) - new FileMessageSet(file, - channel, - start = this.start + position, - end = { - // Handle the integer overflow - if (this.start + position + size < 0) - sizeInBytes() - else - math.min(this.start + position + size, sizeInBytes()) - }) - } - - override def asRecords: FileRecords = new FileRecords(file, channel, start, end, isSlice) - - /** - * Search forward for the file position of the last offset that is greater than or equal to the target offset - * and return its physical position and the size of the message (including log overhead) at the returned offset. If - * no such offsets are found, return null. - * - * @param targetOffset The offset to search for. - * @param startingPosition The starting position in the file to begin searching from. - */ - def searchForOffsetWithSize(targetOffset: Long, startingPosition: Int): (OffsetPosition, Int) = { - var position = startingPosition - val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) - val size = sizeInBytes() - while(position + MessageSet.LogOverhead < size) { - buffer.rewind() - channel.read(buffer, position) - if(buffer.hasRemaining) - throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s" - .format(targetOffset, startingPosition, file.getAbsolutePath)) - buffer.rewind() - val offset = buffer.getLong() - val messageSize = buffer.getInt() - if (messageSize < Message.MinMessageOverhead) - throw new IllegalStateException("Invalid message size: " + messageSize) - if (offset >= targetOffset) - return (OffsetPosition(offset, position), messageSize + MessageSet.LogOverhead) - position += MessageSet.LogOverhead + messageSize - } - null - } - - /** - * Search forward for the message whose timestamp is greater than or equals to the target timestamp. - * - * @param targetTimestamp The timestamp to search for. - * @param startingPosition The starting position to search. - * @return The timestamp and offset of the message found. None, if no message is found. - */ - def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[TimestampOffset] = { - val messagesToSearch = read(startingPosition, sizeInBytes) - for (messageAndOffset <- messagesToSearch) { - val message = messageAndOffset.message - if (message.timestamp >= targetTimestamp) { - // We found a message - message.compressionCodec match { - case NoCompressionCodec => - return Some(TimestampOffset(messageAndOffset.message.timestamp, messageAndOffset.offset)) - case _ => - // Iterate over the inner messages to get the exact offset. - for (innerMessageAndOffset <- ByteBufferMessageSet.deepIterator(messageAndOffset)) { - val timestamp = innerMessageAndOffset.message.timestamp - if (timestamp >= targetTimestamp) - return Some(TimestampOffset(innerMessageAndOffset.message.timestamp, innerMessageAndOffset.offset)) - } - throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" + - s" should contain target timestamp $targetTimestamp but it does not.") - } - } - } - None - } - - /** - * Return the largest timestamp of the messages after a given position in this file message set. - * @param startingPosition The starting position. - * @return The largest timestamp of the messages after the given position. - */ - def largestTimestampAfter(startingPosition: Int): TimestampOffset = { - var maxTimestamp = Message.NoTimestamp - var offsetOfMaxTimestamp = -1L - val messagesToSearch = read(startingPosition, Int.MaxValue) - for (messageAndOffset <- messagesToSearch) { - if (messageAndOffset.message.timestamp > maxTimestamp) { - maxTimestamp = messageAndOffset.message.timestamp - offsetOfMaxTimestamp = messageAndOffset.offset - } - } - TimestampOffset(maxTimestamp, offsetOfMaxTimestamp) - } - - /** - * This method is called before we write messages to the socket using zero-copy transfer. We need to - * make sure all the messages in the message set have the expected magic value. - * - * @param expectedMagicValue the magic value expected - * @return true if all messages have expected magic value, false otherwise - */ - override def isMagicValueInAllWrapperMessages(expectedMagicValue: Byte): Boolean = { - var location = start - val offsetAndSizeBuffer = ByteBuffer.allocate(MessageSet.LogOverhead) - val crcAndMagicByteBuffer = ByteBuffer.allocate(Message.CrcLength + Message.MagicLength) - while (location < end) { - offsetAndSizeBuffer.rewind() - channel.read(offsetAndSizeBuffer, location) - if (offsetAndSizeBuffer.hasRemaining) - return true - offsetAndSizeBuffer.rewind() - offsetAndSizeBuffer.getLong // skip offset field - val messageSize = offsetAndSizeBuffer.getInt - if (messageSize < Message.MinMessageOverhead) - throw new IllegalStateException("Invalid message size: " + messageSize) - crcAndMagicByteBuffer.rewind() - channel.read(crcAndMagicByteBuffer, location + MessageSet.LogOverhead) - if (crcAndMagicByteBuffer.get(Message.MagicOffset) != expectedMagicValue) - return false - location += (MessageSet.LogOverhead + messageSize) - } - true - } - - /** - * Convert this message set to use the specified message format. - */ - def toMessageFormat(toMagicValue: Byte): MessageSet = { - val offsets = new ArrayBuffer[Long] - val newMessages = new ArrayBuffer[Message] - this.foreach { messageAndOffset => - val message = messageAndOffset.message - if (message.compressionCodec == NoCompressionCodec) { - newMessages += message.toFormatVersion(toMagicValue) - offsets += messageAndOffset.offset - } else { - // File message set only has shallow iterator. We need to do deep iteration here if needed. - val deepIter = ByteBufferMessageSet.deepIterator(messageAndOffset) - for (innerMessageAndOffset <- deepIter) { - newMessages += innerMessageAndOffset.message.toFormatVersion(toMagicValue) - offsets += innerMessageAndOffset.offset - } - } - } - - if (sizeInBytes > 0 && newMessages.isEmpty) { - // This indicates that the message is too large. We just return all the bytes in the file message set. - this - } else { - // We use the offset seq to assign offsets so the offset of the messages does not change. - new ByteBufferMessageSet( - compressionCodec = this.headOption.map(_.message.compressionCodec).getOrElse(NoCompressionCodec), - offsetSeq = offsets, - newMessages: _*) - } - } - - /** - * Get a shallow iterator over the messages in the set. - */ - override def iterator: Iterator[MessageAndOffset] = iterator(Int.MaxValue) - - /** - * Get an iterator over the messages in the set. We only do shallow iteration here. - * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory. - * If we encounter a message larger than this we throw an InvalidMessageException. - * @return The iterator. - */ - def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { - new IteratorTemplate[MessageAndOffset] { - var location = start - val sizeOffsetLength = 12 - val sizeOffsetBuffer = ByteBuffer.allocate(sizeOffsetLength) - - override def makeNext(): MessageAndOffset = { - if(location + sizeOffsetLength >= end) - return allDone() - - // read the size of the item - sizeOffsetBuffer.rewind() - channel.read(sizeOffsetBuffer, location) - if(sizeOffsetBuffer.hasRemaining) - return allDone() - - sizeOffsetBuffer.rewind() - val offset = sizeOffsetBuffer.getLong() - val size = sizeOffsetBuffer.getInt() - if(size < Message.MinMessageOverhead || location + sizeOffsetLength + size > end) - return allDone() - if(size > maxMessageSize) - throw new CorruptRecordException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) - - // read the item itself - val buffer = ByteBuffer.allocate(size) - channel.read(buffer, location + sizeOffsetLength) - if(buffer.hasRemaining) - return allDone() - buffer.rewind() - - // increment the location and return the item - location += size + sizeOffsetLength - MessageAndOffset(new Message(buffer), offset) - } - } - } - - /** - * The number of bytes taken up by this file set - */ - def sizeInBytes(): Int = _size.get() - - /** - * Append these messages to the message set - */ - def append(messages: ByteBufferMessageSet) { - val written = messages.writeFullyTo(channel) - _size.getAndAdd(written) - } - - /** - * Commit all written data to the physical disk - */ - def flush() = { - channel.force(true) - } - - /** - * Close this message set - */ - def close() { - flush() - trim() - channel.close() - } - - /** - * Trim file when close or roll to next file - */ - def trim() { - truncateTo(sizeInBytes()) - } - - /** - * Delete this message set from the filesystem - * @return True iff this message set was deleted. - */ - def delete(): Boolean = { - CoreUtils.swallow(channel.close()) - file.delete() - } - - /** - * Truncate this file message set to the given size in bytes. Note that this API does no checking that the - * given size falls on a valid message boundary. - * In some versions of the JDK truncating to the same size as the file message set will cause an - * update of the files mtime, so truncate is only performed if the targetSize is smaller than the - * size of the underlying FileChannel. - * It is expected that no other threads will do writes to the log when this function is called. - * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes. - * @return The number of bytes truncated off - */ - def truncateTo(targetSize: Int): Int = { - val originalSize = sizeInBytes - if(targetSize > originalSize || targetSize < 0) - throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " + - " size of this log segment is " + originalSize + " bytes.") - if (targetSize < channel.size.toInt) { - channel.truncate(targetSize) - channel.position(targetSize) - _size.set(targetSize) - } - originalSize - targetSize - } - - /** - * Read from the underlying file into the buffer starting at the given position - */ - def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = { - channel.read(buffer, relativePosition + this.start) - buffer.flip() - buffer - } - - /** - * Rename the file that backs this message set - * @throws IOException if rename fails. - */ - def renameTo(f: File) { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath) - finally this.file = f - } - -} - -object FileMessageSet extends Logging -{ - //preserve the previous logger name after moving logger aspect from FileMessageSet to companion - override val loggerName = classOf[FileMessageSet].getName - - /** - * Open a channel for the given file - * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize - * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance. - * @param file File path - * @param mutable mutable - * @param fileAlreadyExists File already exists or not - * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024 - * @param preallocate Pre allocate file or not, gotten from configuration. - */ - def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = { - if (mutable) { - if (fileAlreadyExists) - new RandomAccessFile(file, "rw").getChannel() - else { - if (preallocate) { - val randomAccessFile = new RandomAccessFile(file, "rw") - randomAccessFile.setLength(initFileSize) - randomAccessFile.getChannel() - } - else - new RandomAccessFile(file, "rw").getChannel() - } - } - else - new FileInputStream(file).getChannel() - } -} - -object LogFlushStats extends KafkaMetricsGroup { - val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) -} http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 6acc8d2..d58a066 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -19,7 +19,6 @@ package kafka.log import kafka.api.KAFKA_0_10_0_IV0 import kafka.utils._ -import kafka.message._ import kafka.common._ import kafka.metrics.KafkaMetricsGroup import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata} @@ -29,16 +28,18 @@ import java.util.concurrent.atomic._ import java.text.NumberFormat import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} -import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.ListOffsetRequest import scala.collection.Seq import scala.collection.JavaConverters._ import com.yammer.metrics.core.Gauge import org.apache.kafka.common.utils.{Time, Utils} +import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} object LogAppendInfo { - val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) + val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Record.NO_TIMESTAMP, -1L, Record.NO_TIMESTAMP, + NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) } /** @@ -243,7 +244,7 @@ class Log(@volatile var dir: File, val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix) val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize) - val swapSegment = new LogSegment(new FileMessageSet(file = swapFile), + val swapSegment = new LogSegment(FileRecords.open(swapFile), index = index, timeIndex = timeIndex, baseOffset = startOffset, @@ -338,20 +339,20 @@ class Log(@volatile var dir: File, * This method will generally be responsible for assigning offsets to the messages, * however if the assignOffsets=false flag is passed we will only check that the existing offsets are valid. * - * @param messages The message set to append + * @param records The log records to append * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given * @throws KafkaStorageException If the append fails due to an I/O error. * @return Information about the appended messages including the first and last offset. */ - def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = { - val appendInfo = analyzeAndValidateMessageSet(messages) + def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = { + val appendInfo = analyzeAndValidateRecords(records) // if we have any valid messages, append them to the log if (appendInfo.shallowCount == 0) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log - var validMessages = trimInvalidBytes(messages, appendInfo) + var validRecords = trimInvalidBytes(records, appendInfo) try { // they are valid, insert them in the log @@ -363,20 +364,21 @@ class Log(@volatile var dir: File, appendInfo.firstOffset = offset.value val now = time.milliseconds val validateAndOffsetAssignResult = try { - validMessages.validateMessagesAndAssignOffsets(offset, - now, - appendInfo.sourceCodec, - appendInfo.targetCodec, - config.compact, - config.messageFormatVersion.messageFormatVersion, - config.messageTimestampType, - config.messageTimestampDifferenceMaxMs) + LogValidator.validateMessagesAndAssignOffsets(validRecords, + offset, + now, + appendInfo.sourceCodec, + appendInfo.targetCodec, + config.compact, + config.messageFormatVersion.messageFormatVersion, + config.messageTimestampType, + config.messageTimestampDifferenceMaxMs) } catch { case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e) } - validMessages = validateAndOffsetAssignResult.validatedMessages + validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp - appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp + appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.logAppendTime = now @@ -384,14 +386,14 @@ class Log(@volatile var dir: File, // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message // format conversion) if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { - for (messageAndOffset <- validMessages.shallowIterator) { - if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) { + for (logEntry <- validRecords.shallowIterator.asScala) { + if (logEntry.sizeInBytes > config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." - .format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize)) + .format(logEntry.sizeInBytes, config.maxMessageSize)) } } } @@ -399,28 +401,27 @@ class Log(@volatile var dir: File, } else { // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) - throw new IllegalArgumentException("Out of order offsets found in " + messages) + throw new IllegalArgumentException("Out of order offsets found in " + records.deepIterator.asScala.map(_.offset)) } // check messages set size may be exceed config.segmentSize - if (validMessages.sizeInBytes > config.segmentSize) { + if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException("Message set size is %d bytes which exceeds the maximum configured segment size of %d." - .format(validMessages.sizeInBytes, config.segmentSize)) + .format(validRecords.sizeInBytes, config.segmentSize)) } // maybe roll the log if this segment is full - val segment = maybeRoll(messagesSize = validMessages.sizeInBytes, - maxTimestampInMessages = appendInfo.maxTimestamp) + val segment = maybeRoll(messagesSize = validRecords.sizeInBytes, maxTimestampInMessages = appendInfo.maxTimestamp) // now append to the log segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp, - offsetOfLargestTimestamp = appendInfo.offsetOfMaxTimestamp, messages = validMessages) + shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) // increment the log end offset updateLogEndOffset(appendInfo.lastOffset + 1) trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s" - .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validMessages)) + .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords)) if (unflushedMessages >= config.flushInterval) flush() @@ -449,73 +450,74 @@ class Log(@volatile var dir: File, * <li> Whether any compression codec is used (if many are used, then the last one is given) * </ol> */ - private def analyzeAndValidateMessageSet(messages: ByteBufferMessageSet): LogAppendInfo = { + private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset, lastOffset = -1L var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true - var maxTimestamp = Message.NoTimestamp + var maxTimestamp = Record.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L - for(messageAndOffset <- messages.shallowIterator) { + for (entry <- records.shallowIterator.asScala) { // update the first offset if on the first message if(firstOffset < 0) - firstOffset = messageAndOffset.offset + firstOffset = entry.offset // check that offsets are monotonically increasing - if(lastOffset >= messageAndOffset.offset) + if(lastOffset >= entry.offset) monotonic = false // update the last offset seen - lastOffset = messageAndOffset.offset + lastOffset = entry.offset - val m = messageAndOffset.message + val record = entry.record // Check if the message sizes are valid. - val messageSize = MessageSet.entrySize(m) + val messageSize = entry.sizeInBytes if(messageSize > config.maxMessageSize) { - BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes) - BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes) + BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) + BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d." .format(messageSize, config.maxMessageSize)) } // check the validity of the message by checking CRC - m.ensureValid() - if (m.timestamp > maxTimestamp) { - maxTimestamp = m.timestamp + record.ensureValid() + if (record.timestamp > maxTimestamp) { + maxTimestamp = record.timestamp offsetOfMaxTimestamp = lastOffset } shallowMessageCount += 1 validBytesCount += messageSize - val messageCodec = m.compressionCodec - if(messageCodec != NoCompressionCodec) + val messageCodec = CompressionCodec.getCompressionCodec(record.compressionType.id) + if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec } // Apply broker-side compression if any val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) - LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic) + LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Record.NO_TIMESTAMP, sourceCodec, + targetCodec, shallowMessageCount, validBytesCount, monotonic) } /** * Trim any invalid bytes from the end of this message set (if there are any) * - * @param messages The message set to trim + * @param records The records to trim * @param info The general information of the message set * @return A trimmed message set. This may be the same as what was passed in or it may not. */ - private def trimInvalidBytes(messages: ByteBufferMessageSet, info: LogAppendInfo): ByteBufferMessageSet = { - val messageSetValidBytes = info.validBytes - if(messageSetValidBytes < 0) - throw new CorruptRecordException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests") - if(messageSetValidBytes == messages.sizeInBytes) { - messages + private def trimInvalidBytes(records: MemoryRecords, info: LogAppendInfo): MemoryRecords = { + val validBytes = info.validBytes + if (validBytes < 0) + throw new CorruptRecordException("Illegal length of message set " + validBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests") + if (validBytes == records.sizeInBytes) { + records } else { // trim invalid bytes - val validByteBuffer = messages.buffer.duplicate() - validByteBuffer.limit(messageSetValidBytes) - new ByteBufferMessageSet(validByteBuffer) + val validByteBuffer = records.buffer.duplicate() + validByteBuffer.limit(validBytes) + MemoryRecords.readableRecords(validByteBuffer) } } @@ -538,7 +540,7 @@ class Log(@volatile var dir: File, val currentNextOffsetMetadata = nextOffsetMetadata val next = currentNextOffsetMetadata.messageOffset if(startOffset == next) - return FetchDataInfo(currentNextOffsetMetadata, MessageSet.Empty) + return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY) var entry = segments.floorEntry(startOffset) @@ -578,7 +580,7 @@ class Log(@volatile var dir: File, // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata - FetchDataInfo(nextOffsetMetadata, MessageSet.Empty) + FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } /** @@ -610,9 +612,9 @@ class Log(@volatile var dir: File, val segmentsCopy = logSegments.toBuffer // For the earliest and latest, we do not need to return the timestamp. if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) - return Some(TimestampOffset(Message.NoTimestamp, segmentsCopy.head.baseOffset)) + return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset)) else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) - return Some(TimestampOffset(Message.NoTimestamp, logEndOffset)) + return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset)) val targetSeg = { // Get all the segments whose largest timestamp is smaller than target timestamp @@ -656,7 +658,7 @@ class Log(@volatile var dir: File, if (segments.size == numToDelete) roll() // remove the segments for lookups - deletable.foreach(deleteSegment(_)) + deletable.foreach(deleteSegment) } numToDelete } @@ -865,7 +867,7 @@ class Log(@volatile var dir: File, truncateFullyAndStartAt(targetOffset) } else { val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset) - deletable.foreach(deleteSegment(_)) + deletable.foreach(deleteSegment) activeSegment.truncateTo(targetOffset) updateLogEndOffset(targetOffset) this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) @@ -882,7 +884,7 @@ class Log(@volatile var dir: File, debug("Truncate and start log '" + name + "' to " + newOffset) lock synchronized { val segmentsToDelete = logSegments.toList - segmentsToDelete.foreach(deleteSegment(_)) + segmentsToDelete.foreach(deleteSegment) addSegment(new LogSegment(dir, newOffset, indexIntervalBytes = config.indexInterval, http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 4a76b0c..c5a73d5 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -17,20 +17,21 @@ package kafka.log -import java.io.{DataOutputStream, File} +import java.io.File import java.nio._ import java.util.Date import java.util.concurrent.{CountDownLatch, TimeUnit} import com.yammer.metrics.core.Gauge import kafka.common._ -import kafka.message._ import kafka.metrics.KafkaMetricsGroup import kafka.utils._ +import org.apache.kafka.common.record.{FileRecords, LogEntry, MemoryRecords} import org.apache.kafka.common.utils.Time +import MemoryRecords.LogEntryFilter -import scala.Iterable import scala.collection._ +import JavaConverters._ /** * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy. @@ -390,10 +391,10 @@ private[log] class Cleaner(val id: Int, val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix) indexFile.delete() timeIndexFile.delete() - val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate) + val records = FileRecords.open(logFile, false, log.initFileSize(), log.config.preallocate) val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize) val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize) - val cleaned = new LogSegment(messages, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) + val cleaned = new LogSegment(records, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time) try { // clean segments into the new destination segment @@ -449,8 +450,12 @@ private[log] class Cleaner(val id: Int, retainDeletes: Boolean, maxLogMessageSize: Int, stats: CleanerStats) { - def shouldRetain(messageAndOffset: MessageAndOffset): Boolean = - shouldRetainMessage(source, map, retainDeletes, messageAndOffset, stats) + def shouldRetainEntry(logEntry: LogEntry): Boolean = + shouldRetainMessage(source, map, retainDeletes, logEntry, stats) + + class LogCleanerFilter extends LogEntryFilter { + def shouldRetain(logEntry: LogEntry): Boolean = shouldRetainEntry(logEntry) + } var position = 0 while (position < source.log.sizeInBytes) { @@ -460,10 +465,9 @@ private[log] class Cleaner(val id: Int, writeBuffer.clear() source.log.readInto(readBuffer, position) - val messages = new ByteBufferMessageSet(readBuffer) - throttler.maybeThrottle(messages.sizeInBytes) - val result = messages.filterInto(writeBuffer, shouldRetain) - + val records = MemoryRecords.readableRecords(readBuffer) + throttler.maybeThrottle(records.sizeInBytes) + val result = records.filterTo(new LogCleanerFilter, writeBuffer) stats.readMessages(result.messagesRead, result.bytesRead) stats.recopyMessages(result.messagesRetained, result.bytesRetained) @@ -472,9 +476,10 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out if (writeBuffer.position > 0) { writeBuffer.flip() - val retained = new ByteBufferMessageSet(writeBuffer) - dest.append(firstOffset = retained.head.offset, largestTimestamp = result.maxTimestamp, - offsetOfLargestTimestamp = result.offsetOfMaxTimestamp, messages = retained) + + val retained = MemoryRecords.readableRecords(writeBuffer) + dest.append(firstOffset = retained.deepIterator().next().offset, largestTimestamp = result.maxTimestamp, + shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained) throttler.maybeThrottle(writeBuffer.limit) } @@ -488,21 +493,22 @@ private[log] class Cleaner(val id: Int, private def shouldRetainMessage(source: kafka.log.LogSegment, map: kafka.log.OffsetMap, retainDeletes: Boolean, - entry: kafka.message.MessageAndOffset, + entry: LogEntry, stats: CleanerStats): Boolean = { val pastLatestOffset = entry.offset > map.latestOffset if (pastLatestOffset) return true - val key = entry.message.key - if (key != null) { + + if (entry.record.hasKey) { + val key = entry.record.key val foundOffset = map.get(key) /* two cases in which we can get rid of a message: * 1) if there exists a message with the same key but higher offset * 2) if the message is a delete "tombstone" marker and enough time has passed */ val redundant = foundOffset >= 0 && entry.offset < foundOffset - val obsoleteDelete = !retainDeletes && entry.message.isNull + val obsoleteDelete = !retainDeletes && entry.record.hasNullValue !redundant && !obsoleteDelete } else { stats.invalidMessage() @@ -620,12 +626,12 @@ private[log] class Cleaner(val id: Int, checkDone(topicAndPartition) readBuffer.clear() segment.log.readInto(readBuffer, position) - val messages = new ByteBufferMessageSet(readBuffer) - throttler.maybeThrottle(messages.sizeInBytes) + val records = MemoryRecords.readableRecords(readBuffer) + throttler.maybeThrottle(records.sizeInBytes) val startPosition = position - for (entry <- messages) { - val message = entry.message + for (entry <- records.deepIterator.asScala) { + val message = entry.record if (message.hasKey && entry.offset >= start) { if (map.size < maxDesiredMapSize) map.put(message.key, entry.offset) @@ -634,8 +640,9 @@ private[log] class Cleaner(val id: Int, } stats.indexMessagesRead(1) } - position += messages.validBytes - stats.indexBytesRead(messages.validBytes) + val bytesRead = records.validBytes + position += bytesRead + stats.indexBytesRead(bytesRead) // if we didn't read even one complete message, our read buffer may be too small if(position == startPosition) http://git-wip-us.apache.org/repos/asf/kafka/blob/67f1e5b9/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index ed79946..953fca4 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -440,7 +440,7 @@ class LogManager(val logDirs: Array[File], removedLog.dir = renamedDir // change the file pointers for log and index file for (logSegment <- removedLog.logSegments) { - logSegment.log.file = new File(renamedDir, logSegment.log.file.getName) + logSegment.log.setFile(new File(renamedDir, logSegment.log.file.getName)) logSegment.index.file = new File(renamedDir, logSegment.index.file.getName) }
