dajac commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1066999003


##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -234,4 +240,79 @@ class GroupCoordinatorAdapter(
     }
     CompletableFuture.completedFuture(results)
   }
+
+  override def commitOffsets(
+    context: RequestContext,
+    request: OffsetCommitRequestData,
+    bufferSupplier: BufferSupplier
+  ): CompletableFuture[OffsetCommitResponseData] = {
+    val currentTimeMs = time.milliseconds
+    val future = new CompletableFuture[OffsetCommitResponseData]()
+
+    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+      val response = new OffsetCommitResponseData()
+      val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
+
+      commitStatus.forKeyValue { (tp, error) =>
+        val topic = byTopics.get(tp.topic) match {
+          case Some(existingTopic) =>
+            existingTopic
+          case None =>
+            val newTopic = new 
OffsetCommitResponseData.OffsetCommitResponseTopic().setName(tp.topic)
+            byTopics += tp.topic -> newTopic
+            response.topics.add(newTopic)
+            newTopic
+        }
+
+        topic.partitions.add(new 
OffsetCommitResponseData.OffsetCommitResponsePartition()
+          .setPartitionIndex(tp.partition)
+          .setErrorCode(error.code))
+      }
+
+      future.complete(response)
+    }
+
+    // "default" expiration timestamp is defined as now + retention. The 
retention may be overridden
+    // in versions from v2 to v4. Otherwise, the retention defined on the 
broker is used. If an explicit
+    // commit timestamp is provided (v1 only), the expiration timestamp is 
computed based on that.
+    val expireTimeMs = request.retentionTimeMs match {
+      case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+      case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
+    }
+
+    val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    request.topics.forEach { topic =>
+      topic.partitions.forEach { partition =>
+        val tp = new TopicPartition(topic.name, partition.partitionIndex)
+        partitions += tp -> new OffsetAndMetadata(
+          offset = partition.committedOffset,
+          leaderEpoch = partition.committedLeaderEpoch match {
+            case RecordBatch.NO_PARTITION_LEADER_EPOCH => 
Optional.empty[Integer]
+            case committedLeaderEpoch => 
Optional.of[Integer](committedLeaderEpoch)
+          },
+          metadata = partition.committedMetadata match {
+            case null => OffsetAndMetadata.NoMetadata
+            case metadata => metadata
+          },
+          commitTimestamp = partition.commitTimestamp match {
+            case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
+            case customTimestamp => customTimestamp
+          },

Review Comment:
   It seems that they are not validated anywhere. We basically store whatever 
we get. As a result, if the provided retention or the commit timestamp are 
negative, the offset will be expired immediately. This is inline with the 
behavior prior to this patch. We could improve it (if we want) separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to