This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 54003fe00dd MINOR: Extract some codeblocks as methods to simplify 
readability (#18017)
54003fe00dd is described below

commit 54003fe00ddf8e701798e08f156b2bf341b585cc
Author: Oleksandr K. <[email protected]>
AuthorDate: Mon Dec 9 17:25:33 2024 +0200

    MINOR: Extract some codeblocks as methods to simplify readability (#18017)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../main/java/kafka/server/TierStateMachine.java    | 21 +++++++++++++++------
 .../kafka/server/share/SharePartitionManager.java   | 21 +++++----------------
 2 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/core/src/main/java/kafka/server/TierStateMachine.java 
b/core/src/main/java/kafka/server/TierStateMachine.java
index f5f83cb240b..ddb19e86aec 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -229,12 +229,8 @@ public class TierStateMachine {
         }
 
         RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, 
previousOffsetToLeaderLocalLogStartOffset)
-                .orElseThrow(() -> new RemoteStorageException("Couldn't build 
the state from remote store for partition: " + topicPartition +
-                        ", currentLeaderEpoch: " + currentLeaderEpoch +
-                        ", leaderLocalLogStartOffset: " + 
leaderLocalLogStartOffset +
-                        ", leaderLogStartOffset: " + leaderLogStartOffset +
-                        ", epoch: " + targetEpoch +
-                        "as the previous remote log segment metadata was not 
found"));
+                .orElseThrow(() -> buildRemoteStorageException(topicPartition, 
targetEpoch, currentLeaderEpoch,
+                        leaderLocalLogStartOffset, leaderLogStartOffset));
 
 
         // Build leader epoch cache, producer snapshots until 
remoteLogSegmentMetadata.endOffset() and start
@@ -265,4 +261,17 @@ public class TierStateMachine {
 
         return nextOffset;
     }
+
+    private RemoteStorageException buildRemoteStorageException(TopicPartition 
topicPartition,
+                                                               int targetEpoch,
+                                                               int 
currentLeaderEpoch,
+                                                               long 
leaderLocalLogStartOffset,
+                                                               long 
leaderLogStartOffset) {
+        String message = String.format(
+                "Couldn't build the state from remote store for partition: %s, 
currentLeaderEpoch: %d, " +
+                        "leaderLocalLogStartOffset: %d, leaderLogStartOffset: 
%d, epoch: %d as the previous remote log segment metadata was not found",
+                topicPartition, currentLeaderEpoch, leaderLocalLogStartOffset, 
leaderLogStartOffset, targetEpoch
+        );
+        return new RemoteStorageException(message);
+    }
 }
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 709b7205f2a..93978745f4a 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -300,22 +300,7 @@ public class SharePartitionManager implements 
AutoCloseable {
             }
         });
 
-        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
-            futures.values().toArray(new CompletableFuture[0]));
-        return allFutures.thenApply(v -> {
-            Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
result = new HashMap<>();
-            futures.forEach((topicIdPartition, future) -> {
-                ShareAcknowledgeResponseData.PartitionData partitionData = new 
ShareAcknowledgeResponseData.PartitionData()
-                    .setPartitionIndex(topicIdPartition.partition());
-                Throwable t = future.join();
-                if (t != null) {
-                    partitionData.setErrorCode(Errors.forException(t).code())
-                        .setErrorMessage(t.getMessage());
-                }
-                result.put(topicIdPartition, partitionData);
-            });
-            return result;
-        });
+        return mapAcknowledgementFutures(futures);
     }
 
     /**
@@ -376,6 +361,10 @@ public class SharePartitionManager implements 
AutoCloseable {
             }
         });
 
+        return mapAcknowledgementFutures(futuresMap);
+    }
+
+    private CompletableFuture<Map<TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData>> 
mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> 
futuresMap) {
         CompletableFuture<Void> allFutures = CompletableFuture.allOf(
             futuresMap.values().toArray(new CompletableFuture[0]));
         return allFutures.thenApply(v -> {

Reply via email to