satishd commented on code in PR #13984:
URL: https://github.com/apache/kafka/pull/13984#discussion_r1284146317


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -623,10 +632,30 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
             
brokerTopicStats.topicStats(log.topicPartition().topic()).remoteCopyRequestRate().mark();
             brokerTopicStats.allTopicsStats().remoteCopyRequestRate().mark();
-            remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, 
segmentData);
+            Optional<CustomMetadata> customMetadata = 
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new 
RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
-                    RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+                    customMetadata, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+
+            if (customMetadata.isPresent()) {
+                long customMetadataSize = customMetadata.get().value().length;
+                if (customMetadataSize > this.customMetadataSizeLimit) {
+                    CustomMetadataSizeLimitExceededException e = new 
CustomMetadataSizeLimitExceededException();
+                    logger.error("Custom metadata size {} exceeds configured 
limit {}." +
+                                    " Copying will be stopped and copied 
segment will be attempted to clean." +
+                                    " Original metadata: {}",
+                            customMetadataSize, this.customMetadataSizeLimit, 
copySegmentStartedRlsm, e);
+                    try {
+                        // For deletion, we provide back the custom metadata 
by creating a new metadata object from the update.
+                        // However, the update itself will not be stored in 
this case.
+                        
remoteLogStorageManager.deleteLogSegmentData(copySegmentStartedRlsm.createWithUpdates(copySegmentFinishedRlsm));

Review Comment:
   That looks reasonable to me. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to