satishd commented on code in PR #13984: URL: https://github.com/apache/kafka/pull/13984#discussion_r1284146317
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -623,10 +632,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment producerStateSnapshotFile.toPath(), leaderEpochsIndex); brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark(); brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark(); - remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); + Optional<CustomMetadata> customMetadata = remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData); RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(), - RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + customMetadata, RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId); + + if (customMetadata.isPresent()) { + long customMetadataSize = customMetadata.get().value().length; + if (customMetadataSize > this.customMetadataSizeLimit) { + CustomMetadataSizeLimitExceededException e = new CustomMetadataSizeLimitExceededException(); + logger.error("Custom metadata size {} exceeds configured limit {}." + + " Copying will be stopped and copied segment will be attempted to clean." + + " Original metadata: {}", + customMetadataSize, this.customMetadataSizeLimit, copySegmentStartedRlsm, e); + try { + // For deletion, we provide back the custom metadata by creating a new metadata object from the update. + // However, the update itself will not be stored in this case. + remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm)); Review Comment: That looks reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org