This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 54003fe00dd MINOR: Extract some codeblocks as methods to simplify
readability (#18017)
54003fe00dd is described below
commit 54003fe00ddf8e701798e08f156b2bf341b585cc
Author: Oleksandr K. <[email protected]>
AuthorDate: Mon Dec 9 17:25:33 2024 +0200
MINOR: Extract some codeblocks as methods to simplify readability (#18017)
Reviewers: Andrew Schofield <[email protected]>
---
.../main/java/kafka/server/TierStateMachine.java | 21 +++++++++++++++------
.../kafka/server/share/SharePartitionManager.java | 21 +++++----------------
2 files changed, 20 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/kafka/server/TierStateMachine.java
b/core/src/main/java/kafka/server/TierStateMachine.java
index f5f83cb240b..ddb19e86aec 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -229,12 +229,8 @@ public class TierStateMachine {
}
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset)
- .orElseThrow(() -> new RemoteStorageException("Couldn't build
the state from remote store for partition: " + topicPartition +
- ", currentLeaderEpoch: " + currentLeaderEpoch +
- ", leaderLocalLogStartOffset: " +
leaderLocalLogStartOffset +
- ", leaderLogStartOffset: " + leaderLogStartOffset +
- ", epoch: " + targetEpoch +
- "as the previous remote log segment metadata was not
found"));
+ .orElseThrow(() -> buildRemoteStorageException(topicPartition,
targetEpoch, currentLeaderEpoch,
+ leaderLocalLogStartOffset, leaderLogStartOffset));
// Build leader epoch cache, producer snapshots until
remoteLogSegmentMetadata.endOffset() and start
@@ -265,4 +261,17 @@ public class TierStateMachine {
return nextOffset;
}
+
+ private RemoteStorageException buildRemoteStorageException(TopicPartition
topicPartition,
+ int targetEpoch,
+ int
currentLeaderEpoch,
+ long
leaderLocalLogStartOffset,
+ long
leaderLogStartOffset) {
+ String message = String.format(
+ "Couldn't build the state from remote store for partition: %s,
currentLeaderEpoch: %d, " +
+ "leaderLocalLogStartOffset: %d, leaderLogStartOffset:
%d, epoch: %d as the previous remote log segment metadata was not found",
+ topicPartition, currentLeaderEpoch, leaderLocalLogStartOffset,
leaderLogStartOffset, targetEpoch
+ );
+ return new RemoteStorageException(message);
+ }
}
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 709b7205f2a..93978745f4a 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -300,22 +300,7 @@ public class SharePartitionManager implements
AutoCloseable {
}
});
- CompletableFuture<Void> allFutures = CompletableFuture.allOf(
- futures.values().toArray(new CompletableFuture[0]));
- return allFutures.thenApply(v -> {
- Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
result = new HashMap<>();
- futures.forEach((topicIdPartition, future) -> {
- ShareAcknowledgeResponseData.PartitionData partitionData = new
ShareAcknowledgeResponseData.PartitionData()
- .setPartitionIndex(topicIdPartition.partition());
- Throwable t = future.join();
- if (t != null) {
- partitionData.setErrorCode(Errors.forException(t).code())
- .setErrorMessage(t.getMessage());
- }
- result.put(topicIdPartition, partitionData);
- });
- return result;
- });
+ return mapAcknowledgementFutures(futures);
}
/**
@@ -376,6 +361,10 @@ public class SharePartitionManager implements
AutoCloseable {
}
});
+ return mapAcknowledgementFutures(futuresMap);
+ }
+
+ private CompletableFuture<Map<TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData>>
mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>>
futuresMap) {
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futuresMap.values().toArray(new CompletableFuture[0]));
return allFutures.thenApply(v -> {