Repository: kafka Updated Branches: refs/heads/0.8.2 9b6744d3a -> 432d397af
KAFKA-1841; OffsetCommitRequest API - timestamp field is not versioned; patched by Jun Rao; reviewed by Joel Koshy Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/432d397a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/432d397a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/432d397a Branch: refs/heads/0.8.2 Commit: 432d397af8a1d4467fe8041bcff8790864010a80 Parents: 9b6744d Author: Jun Rao <jun...@gmail.com> Authored: Mon Jan 12 22:32:31 2015 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Mon Jan 12 22:32:31 2015 -0800 ---------------------------------------------------------------------- .../apache/kafka/common/protocol/Protocol.java | 36 +++++++- .../common/requests/OffsetCommitRequest.java | 22 ++++- .../scala/kafka/api/OffsetCommitRequest.scala | 21 +++-- .../scala/kafka/api/OffsetCommitResponse.scala | 5 +- .../scala/kafka/api/OffsetFetchRequest.scala | 4 +- .../kafka/common/OffsetMetadataAndError.scala | 2 +- .../src/main/scala/kafka/server/KafkaApis.scala | 96 ++++++++++++++++---- .../api/RequestResponseSerializationTest.scala | 4 +- 8 files changed, 151 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 7517b87..f0a262e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -111,6 +111,16 @@ public class Protocol { new Field("offset", INT64, "Message offset to be committed."), + new Field("metadata", + STRING, + "Any associated metadata the client wants to keep.")); + + public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("offset", + INT64, + "Message offset to be committed."), new Field("timestamp", INT64, "Timestamp of the commit"), @@ -125,6 +135,13 @@ public class Protocol { new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0), "Partitions to commit offsets.")); + public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic", + STRING, + "Topic to commit."), + new Field("partitions", + new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1), + "Partitions to commit offsets.")); + public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."), @@ -142,7 +159,7 @@ public class Protocol { STRING, "The consumer id assigned by the group coordinator."), new Field("topics", - new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0), + new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1), "Topics to commit offsets.")); public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", @@ -158,9 +175,11 @@ public class Protocol { public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); - public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 }; /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */ - public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0}; + public static Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0; + + public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 }; + public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1}; /* Offset fetch api */ public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -181,6 +200,10 @@ public class Protocol { new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0), "Topics to fetch offsets.")); + // version 0 and 1 have exactly the same wire format, but different functionality. + // version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka. + public static Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0; + public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -200,8 +223,11 @@ public class Protocol { public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0))); - public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 }; - public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 }; + /* The response types for both V0 and V1 of OFFSET_FETCH_RESPONSE are the same. */ + public static Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0; + + public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1 }; + public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1 }; /* List offset api */ public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition", http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java index 3ee5cba..66c0772 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java @@ -47,6 +47,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public static final int DEFAULT_GENERATION_ID = -1; public static final String DEFAULT_CONSUMER_ID = ""; + public static final long DEFAULT_TIMESTAMP = -1L; private final String groupId; private final int generationId; @@ -58,6 +59,11 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public final long timestamp; public final String metadata; + // for v0 + public PartitionData(long offset, String metadata) { + this(offset, DEFAULT_TIMESTAMP, metadata); + } + public PartitionData(long offset, long timestamp, String metadata) { this.offset = offset; this.timestamp = timestamp; @@ -73,7 +79,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { @Deprecated public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) { super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0))); - initCommonFields(groupId, offsetData); + initCommonFields(groupId, offsetData, 0); this.groupId = groupId; this.generationId = DEFAULT_GENERATION_ID; this.consumerId = DEFAULT_CONSUMER_ID; @@ -90,7 +96,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) { super(new Struct(curSchema)); - initCommonFields(groupId, offsetData); + initCommonFields(groupId, offsetData, 1); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(CONSUMER_ID_KEY_NAME, consumerId); this.groupId = groupId; @@ -99,7 +105,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse { this.offsetData = offsetData; } - private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) { + private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData, int versionId) { Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData); struct.set(GROUP_ID_KEY_NAME, groupId); @@ -113,7 +119,8 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME); partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey()); partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset); - partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); + if (versionId == 1) + partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp); partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata); partitionArray.add(partitionData); } @@ -133,7 +140,12 @@ public class OffsetCommitRequest extends AbstractRequestResponse { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.getInt(PARTITION_KEY_NAME); long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME); - long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + long timestamp; + // timestamp only exists in v1 + if (partitionResponse.hasField(TIMESTAMP_KEY_NAME)) + timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME); + else + timestamp = DEFAULT_TIMESTAMP; String metadata = partitionResponse.getString(METADATA_KEY_NAME); PartitionData partitionData = new PartitionData(offset, timestamp, metadata); offsetData.put(new TopicPartition(topic, partition), partitionData); http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala index 861a6cf..39607c7 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala @@ -30,8 +30,6 @@ object OffsetCommitRequest extends Logging { val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetCommitRequest = { - val now = SystemTime.milliseconds - // Read values from the envelope val versionId = buffer.getShort assert(versionId == 0 || versionId == 1, @@ -59,8 +57,11 @@ object OffsetCommitRequest extends Logging { val partitionId = buffer.getInt val offset = buffer.getLong val timestamp = { - val given = buffer.getLong - if (given == -1L) now else given + if (versionId == 1) { + val given = buffer.getLong + given + } else + OffsetAndMetadata.InvalidTime } val metadata = readShortString(buffer) (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp)) @@ -68,6 +69,13 @@ object OffsetCommitRequest extends Logging { }) OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId) } + + def changeInvalidTimeToCurrentTime(offsetCommitRequest: OffsetCommitRequest) { + val now = SystemTime.milliseconds + for ( (topicAndPartiiton, offsetAndMetadata) <- offsetCommitRequest.requestInfo) + if (offsetAndMetadata.timestamp == OffsetAndMetadata.InvalidTime) + offsetAndMetadata.timestamp = now + } } case class OffsetCommitRequest(groupId: String, @@ -121,7 +129,8 @@ case class OffsetCommitRequest(groupId: String, t1._2.foreach( t2 => { buffer.putInt(t2._1.partition) buffer.putLong(t2._2.offset) - buffer.putLong(t2._2.timestamp) + if (versionId == 1) + buffer.putLong(t2._2.timestamp) writeShortString(buffer, t2._2.metadata) }) }) @@ -143,7 +152,7 @@ case class OffsetCommitRequest(groupId: String, innerCount + 4 /* partition */ + 8 /* offset */ + - 8 /* timestamp */ + + (if (versionId == 1) 8 else 0 ) /* timestamp */ + shortStringLength(offsetAndMetadata._2.metadata) }) }) http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala index 624a1c1..03dd736 100644 --- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala +++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala @@ -23,7 +23,7 @@ import kafka.utils.Logging import kafka.common.TopicAndPartition object OffsetCommitResponse extends Logging { - val CurrentVersion: Short = 0 + val CurrentVersion: Short = 1 def readFrom(buffer: ByteBuffer): OffsetCommitResponse = { val correlationId = buffer.getInt @@ -41,6 +41,9 @@ object OffsetCommitResponse extends Logging { } } +/** + * Single constructor for both version 0 and 1 since they have the same format. + */ case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short], correlationId: Int = 0) extends RequestOrResponse() { http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala index c7604b9..74ee829 100644 --- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala +++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala @@ -25,7 +25,9 @@ import kafka.network.{BoundedByteBufferSend, RequestChannel} import kafka.network.RequestChannel.Response import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition} object OffsetFetchRequest extends Logging { - val CurrentVersion: Short = 0 + // version 0 and 1 have exactly the same wire format, but different functionality. + // version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka. + val CurrentVersion: Short = 1 val DefaultClientId = "" def readFrom(buffer: ByteBuffer): OffsetFetchRequest = { http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index 4cabffe..db7157d 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -19,7 +19,7 @@ package kafka.common case class OffsetAndMetadata(offset: Long, metadata: String = OffsetAndMetadata.NoMetadata, - timestamp: Long = -1L) { + var timestamp: Long = -1L) { override def toString = "OffsetAndMetadata[%d,%s%s]" .format(offset, if (metadata != null && metadata.length > 0) metadata else "NO_METADATA", http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 9a61fcb..d626b17 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -25,7 +25,7 @@ import kafka.network._ import kafka.admin.AdminUtils import kafka.network.RequestChannel.Response import kafka.controller.KafkaController -import kafka.utils.{SystemTime, Logging} +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging} import scala.collection._ @@ -64,7 +64,7 @@ class KafkaApis(val requestChannel: RequestChannel, case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request) case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request) case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request) - case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request) + case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request) case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request) case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) @@ -77,6 +77,40 @@ class KafkaApis(val requestChannel: RequestChannel, request.apiLocalCompleteTimeMs = SystemTime.milliseconds } + def handleOffsetCommitRequest(request: RequestChannel.Request) { + val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + if (offsetCommitRequest.versionId == 0) { + // version 0 stores the offsets in ZK + val responseInfo = offsetCommitRequest.requestInfo.map{ + case (topicAndPartition, metaAndError) => { + val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic) + try { + ensureTopicExists(topicAndPartition.topic) + if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) { + (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) + } else { + ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + + topicAndPartition.partition, metaAndError.offset.toString) + (topicAndPartition, ErrorMapping.NoError) + } + } catch { + case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])) + } + } + } + val response = new OffsetCommitResponse(responseInfo, offsetCommitRequest.correlationId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } else { + // version 1 and above store the offsets in a special Kafka topic + handleProducerOrOffsetCommitRequest(request) + } + } + + private def ensureTopicExists(topic: String) = { + if (metadataCache.getTopicMetadata(Set(topic)).size <= 0) + throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted") + } + def handleLeaderAndIsrRequest(request: RequestChannel.Request) { // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they @@ -154,6 +188,7 @@ class KafkaApis(val requestChannel: RequestChannel, val (produceRequest, offsetCommitRequestOpt) = if (request.requestId == RequestKeys.OffsetCommitKey) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] + OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest) (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) } else { (request.requestObj.asInstanceOf[ProducerRequest], None) @@ -504,22 +539,47 @@ class KafkaApis(val requestChannel: RequestChannel, def handleOffsetFetchRequest(request: RequestChannel.Request) { val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest] - val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => - metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty - ) - val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap - val knownStatus = - if (knownTopicPartitions.size > 0) - offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap - else - Map.empty[TopicAndPartition, OffsetMetadataAndError] - val status = unknownStatus ++ knownStatus - - val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) - - trace("Sending offset fetch response %s for correlation id %d to client %s." - .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + if (offsetFetchRequest.versionId == 0) { + // version 0 reads offsets from ZK + val responseInfo = offsetFetchRequest.requestInfo.map( t => { + val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic) + try { + ensureTopicExists(t.topic) + val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1 + payloadOpt match { + case Some(payload) => { + (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError)) + } + case None => (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, + ErrorMapping.UnknownTopicOrPartitionCode)) + } + } catch { + case e: Throwable => + (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, + ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))) + } + }) + val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } else { + // version 1 reads offsets from Kafka + val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition => + metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty + ) + val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap + val knownStatus = + if (knownTopicPartitions.size > 0) + offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap + else + Map.empty[TopicAndPartition, OffsetMetadataAndError] + val status = unknownStatus ++ knownStatus + + val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId) + + trace("Sending offset fetch response %s for correlation id %d to client %s." + .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + } } /* http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index cd16ced..4e817a2 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -163,8 +163,8 @@ object SerializationTestUtils { versionId = 0, groupId = "group 1", requestInfo = collection.immutable.Map( - TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds), - TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds) + TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata"), + TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata) )) }