This is an automated email from the ASF dual-hosted git repository.
boyang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0358c21 minor stylish fixes to raft client (#10809)
0358c21 is described below
commit 0358c21ae4d78c2f035f5d0fb1ed087611ec2ba5
Author: Boyang Chen <[email protected]>
AuthorDate: Thu Jun 3 18:51:03 2021 -0700
minor stylish fixes to raft client (#10809)
Style fixes to KafkaRaftClient
Reviewers: Luke Chen <[email protected]>
---
.../org/apache/kafka/raft/KafkaRaftClient.java | 52 +++++++++-------------
1 file changed, 21 insertions(+), 31 deletions(-)
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index e0e7cb4..f004203 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -309,17 +309,15 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
for (ListenerContext listenerContext : listenerContexts) {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset
-> {
if (nextExpectedOffset < log.startOffset() &&
nextExpectedOffset < highWatermark) {
- SnapshotReader<T> snapshot =
latestSnapshot().orElseThrow(() -> {
- return new IllegalStateException(
- String.format(
- "Snapshot expected since next offset of %s is
%s, log start offset is %s and high-watermark is %s",
-
listenerContext.listener.getClass().getTypeName(),
- nextExpectedOffset,
- log.startOffset(),
- highWatermark
- )
- );
- });
+ SnapshotReader<T> snapshot =
latestSnapshot().orElseThrow(() -> new IllegalStateException(
+ String.format(
+ "Snapshot expected since next offset of %s is %s,
log start offset is %s and high-watermark is %s",
+ listenerContext.listener.getClass().getTypeName(),
+ nextExpectedOffset,
+ log.startOffset(),
+ highWatermark
+ )
+ ));
listenerContext.fireHandleSnapshot(snapshot);
}
});
@@ -347,14 +345,10 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
private void maybeFireHandleCommit(long baseOffset, int epoch, List<T>
records) {
for (ListenerContext listenerContext : listenerContexts) {
OptionalLong nextExpectedOffsetOpt =
listenerContext.nextExpectedOffset();
- if (!nextExpectedOffsetOpt.isPresent()) {
- continue;
- }
-
- long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
- if (nextExpectedOffset == baseOffset) {
- listenerContext.fireHandleCommit(baseOffset, epoch, records);
- }
+ nextExpectedOffsetOpt.ifPresent(nextOffset -> {
+ if (nextOffset == baseOffset)
+ listenerContext.fireHandleCommit(baseOffset, epoch,
records);
+ });
}
}
@@ -388,7 +382,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
if (quorum.isVoter()
&& quorum.remoteVoters().isEmpty()
&& !quorum.isCandidate()) {
-
transitionToCandidate(currentTimeMs);
}
} catch (IOException e) {
@@ -449,7 +442,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
- // We update the end offset before flushing so that parked fetches can
return sooner
+ // We update the end offset before flushing so that parked fetches can
return sooner.
updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
log.flush();
}
@@ -502,11 +495,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
resetConnections();
// After becoming a follower, we need to complete all pending fetches
so that
- // they can be resent to the leader without waiting for their
expiration
+ // they can be re-sent to the leader without waiting for their
expirations
fetchPurgatory.completeAllExceptionally(new
NotLeaderOrFollowerException(
"Cannot process the fetch request because the node is no longer
the leader."));
- // Clearing the append purgatory should complete all future
exceptionally since this node is no longer the leader
+ // Clearing the append purgatory should complete all futures
exceptionally since this node is no longer the leader
appendPurgatory.completeAllExceptionally(new
NotLeaderOrFollowerException(
"Failed to receive sufficient acknowledgments for this append
before leader change."));
}
@@ -552,7 +545,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
if (!hasValidTopicPartition(request, log.topicPartition())) {
- // Until we support multi-raft, we treat topic partition
mismatches as invalid requests
+ // Until we support multi-raft, we treat individual topic
partition mismatches as invalid requests
return new
VoteResponseData().setErrorCode(Errors.INVALID_REQUEST.code());
}
@@ -638,7 +631,6 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
binaryExponentialElectionBackoffMs(state.retries())
);
}
-
}
} else {
logger.debug("Ignoring vote response {} since we are no longer
a candidate in epoch {}",
@@ -1072,7 +1064,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
FetchResponseData.EpochEndOffset divergingEpoch =
partitionResponse.divergingEpoch();
if (divergingEpoch.epoch() >= 0) {
// The leader is asking us to truncate before continuing
- OffsetAndEpoch divergingOffsetAndEpoch = new OffsetAndEpoch(
+ final OffsetAndEpoch divergingOffsetAndEpoch = new
OffsetAndEpoch(
divergingEpoch.endOffset(), divergingEpoch.epoch());
state.highWatermark().ifPresent(highWatermark -> {
@@ -1104,7 +1096,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
);
return false;
} else {
- OffsetAndEpoch snapshotId = new OffsetAndEpoch(
+ final OffsetAndEpoch snapshotId = new OffsetAndEpoch(
partitionResponse.snapshotId().endOffset(),
partitionResponse.snapshotId().epoch()
);
@@ -1193,7 +1185,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
*/
private FetchSnapshotResponseData handleFetchSnapshotRequest(
RaftRequest.Inbound requestMetadata
- ) throws IOException {
+ ) {
FetchSnapshotRequestData data = (FetchSnapshotRequestData)
requestMetadata.data;
if (!hasValidClusterId(data.clusterId())) {
@@ -2038,7 +2030,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
}
}
- private long maybeSendFetchOrFetchSnapshot(FollowerState state, long
currentTimeMs) throws IOException {
+ private long maybeSendFetchOrFetchSnapshot(FollowerState state, long
currentTimeMs) {
final Supplier<ApiMessage> requestSupplier;
if (state.fetchingSnapshot().isPresent()) {
@@ -2465,7 +2457,5 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
wakeup();
}
}
-
}
-
}