szetszwo commented on code in PR #1215:
URL: https://github.com/apache/ratis/pull/1215#discussion_r1931044138
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java:
##########
@@ -40,11 +40,122 @@
import org.apache.ratis.util.TimeDuration;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
/** Server utilities for internal use. */
public final class ServerImplUtils {
+ /** The consecutive indices within the same term. */
+ static class ConsecutiveIndices {
+ /** Convert the given entries to a list of {@link ConsecutiveIndices} */
+ static List<ConsecutiveIndices> convert(List<LogEntryProto> entries) {
+ if (entries == null || entries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<ConsecutiveIndices> indices = null;
+
+ LogEntryProto previous = entries.get(0);
+ long startIndex = previous.getIndex();
+ int count = 1;
+
+ for (int i = 1; i < entries.size(); i++) {
+ final LogEntryProto current = entries.get(i);
+ // validate if the indices are consecutive
+ Preconditions.assertSame(previous.getIndex() + 1, current.getIndex(),
"index");
+
+ if (current.getTerm() == previous.getTerm()) {
+ count++;
+ } else {
+ // validate if the terms are increasing
+ Preconditions.assertTrue(previous.getTerm() < current.getTerm(),
"term");
+ if (indices == null) {
+ indices = new ArrayList<>();
+ }
+ indices.add(new ConsecutiveIndices(previous.getTerm(), startIndex,
count));
+
+ startIndex = current.getIndex();
+ count = 1;
+ }
+ previous = current;
+ }
+
+ final ConsecutiveIndices last = new
ConsecutiveIndices(previous.getTerm(), startIndex, count);
+ if (indices == null) {
+ return Collections.singletonList(last);
+ } else {
+ indices.add(last);
+ return indices;
+ }
+ }
+
+ private final long term;
+ private final long startIndex;
+ private final int count;
+
+ ConsecutiveIndices(long term, long startIndex, int count) {
+ Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0 ");
+ this.term = term;
+ this.startIndex = startIndex;
+ this.count = count;
+ }
+
+ long getNextIndex() {
+ return startIndex + count;
+ }
+
+ Long getTerm(long index) {
+ final long diff = index - startIndex;
+ return diff < 0 || diff >= count ? null: term;
+ }
+ }
+
+ /** A data structure to support the {@link #contains(TermIndex)} method. */
+ static class NavigableIndices {
+ private final NavigableMap<Long, ConsecutiveIndices> map = new TreeMap<>();
+
+ boolean contains(TermIndex ti) {
+ final Long term = getTerm(ti.getIndex());
+ return term != null && term == ti.getTerm();
+ }
+
+ synchronized Long getTerm(long index) {
+ if (map.isEmpty()) {
+ return null;
+ }
+
+ final Map.Entry<Long, ConsecutiveIndices> floorEntry =
map.floorEntry(index);
+ if (floorEntry == null) {
+ return null;
+ }
+ return floorEntry.getValue().getTerm(index);
+ }
+
+ synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
+ for(ConsecutiveIndices indices : entriesTermIndices) {
+ // validate index0
+ final long index0 = indices.startIndex;
+ final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
+ if (lastEntry != null) {
+ Preconditions.assertSame(lastEntry.getValue().getNextIndex(),
index0, "index0");
+ }
+ map.put(index0, indices);
Review Comment:
Let's remove `index0`.
```diff
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 976a05008..c5010a534 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -138,13 +138,12 @@ public final class ServerImplUtils {
synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
for(ConsecutiveIndices indices : entriesTermIndices) {
- // validate index0
- final long index0 = indices.startIndex;
+ // validate startIndex
final Map.Entry<Long, ConsecutiveIndices> lastEntry =
map.lastEntry();
if (lastEntry != null) {
- Preconditions.assertSame(lastEntry.getValue().getNextIndex(),
index0, "index0");
+ Preconditions.assertSame(lastEntry.getValue().getNextIndex(),
indices.startIndex, "startIndex");
}
- map.put(index0, indices);
+ map.put(indices.startIndex, indices);
}
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java:
##########
@@ -1687,10 +1690,16 @@ leaderId, getMemberId(), currentTerm, followerCommit,
inconsistencyReplyNextInde
});
}
private CompletableFuture<Void>
appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
- entriesRef.retain();
+ final List<LogEntryProto> entries = entriesRef.retain();
+ final List<ConsecutiveIndices> entriesTermIndices =
ConsecutiveIndices.convert(entries);
+ appendLogTermIndices.append(entriesTermIndices);
+
Review Comment:
We actually need to retain twice, one for sync and another one for async.
```java
private CompletableFuture<Void>
appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
final List<ConsecutiveIndices> entriesTermIndices;
try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries =
entriesRef.retainAndReleaseOnClose()) {
entriesTermIndices = ConsecutiveIndices.convert(entries.get());
appendLogTermIndices.append(entriesTermIndices);
}
entriesRef.retain();
return appendLogFuture.updateAndGet(f -> f.thenCompose(
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]