szetszwo commented on code in PR #762:
URL: https://github.com/apache/ratis/pull/762#discussion_r993561507
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java:
##########
@@ -88,18 +92,17 @@ public SegmentedRaftLogOutputStream(File file, boolean
append, long segmentMaxSi
public void write(LogEntryProto entry) throws IOException {
final int serialized = entry.getSerializedSize();
final int proto = CodedOutputStream.computeUInt32SizeNoTag(serialized) +
serialized;
- final byte[] buf = new byte[proto + 4]; // proto and 4-byte checksum
- preallocateIfNecessary(buf.length);
+ preallocateIfNecessary(proto + 4); // proto and 4-byte checksum
Review Comment:
Allow `sharedBuffer` to be null and check `buf.length`:
```java
final int total = proto + 4; // proto and 4-byte checksum
final byte[] buf = sharedBuffer != null? sharedBuffer.get(): new
byte[total];
Preconditions.assertTrue(total <= buf.length, () -> "total = " + total +
" > buf.length " + buf.length);
preallocateIfNecessary(total);
```
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java:
##########
@@ -58,13 +59,14 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
private final long preallocatedSize;
public SegmentedRaftLogOutputStream(File file, boolean append, long
segmentMaxSize,
- long preallocatedSize, ByteBuffer byteBuffer, byte[] serializeBuf)
+ long preallocatedSize, ByteBuffer
byteBuffer,
+ MemoizedSupplier<byte[]> serializeBuf)
Review Comment:
Use `Supplier<byte[]>`.
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogOutputStream.java:
##########
@@ -52,17 +53,20 @@ public class SegmentedRaftLogOutputStream implements
Closeable {
private final File file;
private final BufferedWriteChannel out; // buffered FileChannel for writing
private final Checksum checksum;
+ private final byte[] serializeBuf;
Review Comment:
Rename it to sharedBuffer and use Supplier:
```java
private final Supplier<byte[]> sharedBuffer;
```
##########
ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java:
##########
@@ -264,7 +265,7 @@ public long takeSnapshot() {
LOG.debug("Taking a snapshot with t:{}, i:{}, file:{}",
termIndex.getTerm(),
termIndex.getIndex(), snapshotFile);
try (SegmentedRaftLogOutputStream out = new
SegmentedRaftLogOutputStream(snapshotFile, false,
- segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize))) {
+ segmentMaxSize, preallocatedSize,
ByteBuffer.allocateDirect(bufferSize), MemoizedSupplier.valueOf(() -> new
byte[1024]))) {
Review Comment:
Add a constructor so that we don't have to change the tests.
```java
public SegmentedRaftLogOutputStream(File file, boolean append, long
segmentMaxSize,
long preallocatedSize, ByteBuffer byteBuffer)
throws IOException {
this(file, append, segmentMaxSize, preallocatedSize, byteBuffer, null);
}
```
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java:
##########
@@ -149,6 +149,7 @@ synchronized void updateIndex(long i) {
private final StateMachine stateMachine;
private final SegmentedRaftLogMetrics raftLogMetrics;
private final ByteBuffer writeBuffer;
+ private final MemoizedSupplier<byte[]> serializeBuf;
Review Comment:
Rename it to sharedBuffer and use Supplier:
```java
private final Supplier<byte[]> sharedBuffer;
```
##########
ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java:
##########
@@ -207,6 +208,8 @@ synchronized void updateIndex(long i) {
final int bufferSize =
RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize);
+ final int serializeBufSize =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
+ this.serializeBuf = MemoizedSupplier.valueOf(() -> new
byte[serializeBufSize]);
Review Comment:
It should be `limit + 8`:
```java
final int logEntryLimit =
RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
// 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum)
this.sharedBuffer = MemoizedSupplier.valueOf(() -> new
byte[logEntryLimit + 8]);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]