dajac commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1066999003
########## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ########## @@ -234,4 +240,79 @@ class GroupCoordinatorAdapter( } CompletableFuture.completedFuture(results) } + + override def commitOffsets( + context: RequestContext, + request: OffsetCommitRequestData, + bufferSupplier: BufferSupplier + ): CompletableFuture[OffsetCommitResponseData] = { + val currentTimeMs = time.milliseconds + val future = new CompletableFuture[OffsetCommitResponseData]() + + def callback(commitStatus: Map[TopicPartition, Errors]): Unit = { + val response = new OffsetCommitResponseData() + val byTopics = new mutable.HashMap[String, OffsetCommitResponseData.OffsetCommitResponseTopic]() + + commitStatus.forKeyValue { (tp, error) => + val topic = byTopics.get(tp.topic) match { + case Some(existingTopic) => + existingTopic + case None => + val newTopic = new OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> newTopic + response.topics.add(newTopic) + newTopic + } + + topic.partitions.add(new OffsetCommitResponseData.OffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) + } + + // "default" expiration timestamp is defined as now + retention. The retention may be overridden + // in versions from v2 to v4. Otherwise, the retention defined on the broker is used. If an explicit + // commit timestamp is provided (v1 only), the expiration timestamp is computed based on that. + val expireTimeMs = request.retentionTimeMs match { + case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None + case retentionTimeMs => Some(currentTimeMs + retentionTimeMs) + } + + val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + request.topics.forEach { topic => + topic.partitions.forEach { partition => + val tp = new TopicPartition(topic.name, partition.partitionIndex) + partitions += tp -> new OffsetAndMetadata( + offset = partition.committedOffset, + leaderEpoch = partition.committedLeaderEpoch match { + case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] + case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) + }, + metadata = partition.committedMetadata match { + case null => OffsetAndMetadata.NoMetadata + case metadata => metadata + }, + commitTimestamp = partition.commitTimestamp match { + case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs + case customTimestamp => customTimestamp + }, Review Comment: It seems that they are not validated anywhere. We basically store whatever we get. As a result, if the provided retention or the commit timestamp are negative, the offset will be expired immediately. This is inline with the behavior prior to this patch. We could improve it (if we want) separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org