This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new aa6ed83 RATIS-473. Clarify size and length methods on LogStream
aa6ed83 is described below
commit aa6ed83a1940dec05f021c3340b316d3f7bce00a
Author: Josh Elser <[email protected]>
AuthorDate: Wed Jan 9 15:28:20 2019 -0500
RATIS-473. Clarify size and length methods on LogStream
getLength() was meant to be the number of records in a log
but was implemented as the size in bytes of the records in
the log. This commit adds a getSize() which takes the old
implementation of getLength(), and then reimplements
getLength() as the number of records in the log.
Signed-off-by: Vladimir Rodionov <[email protected]>
Signed-off-by: Rajeshbabu Chintaguntla <[email protected]>
---
.../org/apache/ratis/logservice/api/LogStream.java | 6 ++++
.../ratis/logservice/impl/LogStreamImpl.java | 35 +++++++++++++++-------
.../ratis/logservice/server/LogStateMachine.java | 30 +++++++++++++------
.../ratis/logservice/util/LogServiceProtoUtil.java | 18 +++++++++++
ratis-logservice/src/main/proto/LogService.proto | 12 ++++++++
.../ratis/logservice/LogServiceReadWriteBase.java | 10 +++----
6 files changed, 87 insertions(+), 24 deletions(-)
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
index fbb977f..f1123af 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/api/LogStream.java
@@ -52,6 +52,12 @@ public interface LogStream extends AutoCloseable{
long getSize() throws IOException;
/**
+ * Returns the number of records in this log.
+ * @throws IOException
+ */
+ long getLength() throws IOException;
+
+ /**
* Creates a reader to read this LogStream.
*
* @return A synchronous reader
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index 2ab0103..3f41c80 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -32,6 +32,7 @@ import org.apache.ratis.logservice.api.LogWriter;
import org.apache.ratis.logservice.api.RecordListener;
import
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLastCommittedIndexReplyProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogLengthReplyProto;
+import org.apache.ratis.logservice.proto.LogServiceProtos.GetLogSizeReplyProto;
import
org.apache.ratis.logservice.proto.LogServiceProtos.GetLogStartIndexReplyProto;
import org.apache.ratis.logservice.proto.LogServiceProtos.LogServiceException;
import org.apache.ratis.logservice.util.LogServiceProtoUtil;
@@ -102,16 +103,30 @@ public class LogStreamImpl implements LogStream {
@Override
public long getSize() throws IOException{
- RaftClientReply reply = raftClient
- .sendReadOnly(Message.valueOf(LogServiceProtoUtil
- .toGetLengthRequestProto(name).toByteString()));
- GetLogLengthReplyProto proto =
- GetLogLengthReplyProto.parseFrom(reply.getMessage().getContent());
- if (proto.hasException()) {
- LogServiceException e = proto.getException();
- throw new IOException(e.getErrorMsg());
- }
- return proto.getLength();
+ RaftClientReply reply = raftClient
+ .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .toGetSizeRequestProto(name).toByteString()));
+ GetLogSizeReplyProto proto =
+ GetLogSizeReplyProto.parseFrom(reply.getMessage().getContent());
+ if (proto.hasException()) {
+ LogServiceException e = proto.getException();
+ throw new IOException(e.getErrorMsg());
+ }
+ return proto.getSize();
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ RaftClientReply reply = raftClient
+ .sendReadOnly(Message.valueOf(LogServiceProtoUtil
+ .toGetLengthRequestProto(name).toByteString()));
+ GetLogLengthReplyProto proto =
+ GetLogLengthReplyProto.parseFrom(reply.getMessage().getContent());
+ if (proto.hasException()) {
+ LogServiceException e = proto.getException();
+ throw new IOException(e.getErrorMsg());
+ }
+ return proto.getLength();
}
@Override
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 2ee5a53..900aac1 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -62,11 +62,10 @@ public class LogStateMachine extends BaseStateMachine {
}
/*
- * State is a pair log's length and state (closed/open);
+ * State is a log's length, size, and state (closed/open);
*/
-
+ private long size;
private long length;
-
private State state = State.OPEN;
private final SimpleStateMachineStorage storage = new
SimpleStateMachineStorage();
@@ -92,6 +91,7 @@ public class LogStateMachine extends BaseStateMachine {
*/
void reset() {
this.length = 0;
+ this.size = 0;
setLastAppliedTermIndex(null);
}
@@ -132,6 +132,7 @@ public class LogStateMachine extends BaseStateMachine {
final ObjectOutputStream out = new ObjectOutputStream(
new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
out.writeLong(length);
+ out.writeLong(size);
out.writeObject(state);
} catch(IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
@@ -165,6 +166,7 @@ public class LogStateMachine extends BaseStateMachine {
}
setLastAppliedTermIndex(last);
this.length = in.readLong();
+ this.size = in.readLong();
this.state = (State) in.readObject();
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
@@ -190,14 +192,16 @@ public class LogStateMachine extends BaseStateMachine {
case READNEXTQUERY:
return processReadRequest(logServiceRequestProto);
- case LENGTHQUERY:
- return processGetLengthRequest(logServiceRequestProto);
+ case SIZEREQUEST:
+ return processGetSizeRequest(logServiceRequestProto);
case STARTINDEXQUERY:
return processGetStartIndexRequest(logServiceRequestProto);
case GETSTATE:
return processGetStateRequest(logServiceRequestProto);
case LASTINDEXQUERY:
return processGetLastCommittedIndexRequest(logServiceRequestProto);
+ case LENGTHQUERY:
+ return processGetLengthRequest(logServiceRequestProto);
default:
// TODO
throw new RuntimeException(
@@ -246,6 +250,14 @@ public class LogStateMachine extends BaseStateMachine {
* @param msg message
* @return reply message
*/
+ private CompletableFuture<Message>
processGetSizeRequest(LogServiceRequestProto proto) {
+ GetLogSizeRequestProto msgProto = proto.getSizeRequest();
+ Throwable t = verifyState(State.OPEN);
+ LOG.debug("QUERY: {}, RESULT: {}", msgProto, this.size);
+ return CompletableFuture.completedFuture(Message
+ .valueOf(LogServiceProtoUtil.toGetLogSizeReplyProto(this.size,
t).toByteString()));
+ }
+
private CompletableFuture<Message>
processGetLengthRequest(LogServiceRequestProto proto) {
GetLogLengthRequestProto msgProto = proto.getLengthQuery();
Throwable t = verifyState(State.OPEN);
@@ -253,7 +265,6 @@ public class LogStateMachine extends BaseStateMachine {
return CompletableFuture.completedFuture(Message
.valueOf(LogServiceProtoUtil.toGetLogLengthReplyProto(this.length,
t).toByteString()));
}
-
/**
* Process read log entries request
* @param msg message
@@ -311,15 +322,16 @@ public class LogStateMachine extends BaseStateMachine {
final LogEntryProto entry = trx.getLogEntry();
AppendLogEntryRequestProto proto = logProto.getAppendRequest();
final long index = entry.getIndex();
- long total = 0;
+ long newSize = 0;
Throwable t = verifyState(State.OPEN);
if (t == null) {
try (final AutoCloseableLock writeLock = writeLock()) {
List<byte[]> entries =
LogServiceProtoUtil.toListByteArray(proto.getDataList());
for (byte[] bb : entries) {
- total += bb.length;
+ newSize += bb.length;
}
- this.length += total;
+ this.size += newSize;
+ this.length += entries.size();
// TODO do we need this for other write request (close, sync)
updateLastAppliedTermIndex(entry.getTerm(), index);
}
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
index 4a044c6..00b6e2a 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/util/LogServiceProtoUtil.java
@@ -75,6 +75,14 @@ public class LogServiceProtoUtil {
return builder.build();
}
+ public static LogServiceRequestProto toGetSizeRequestProto(LogName name) {
+ LogNameProto logNameProto =
+ LogNameProto.newBuilder().setName(name.getName()).build();
+ GetLogSizeRequestProto getLogSize = GetLogSizeRequestProto.newBuilder()
+ .setLogName(logNameProto).build();
+ return
LogServiceRequestProto.newBuilder().setSizeRequest(getLogSize).build();
+ }
+
public static LogServiceRequestProto toGetLengthRequestProto(LogName name) {
LogNameProto logNameProto =
LogNameProto.newBuilder().setName(name.getName()).build();
@@ -165,6 +173,16 @@ public class LogServiceProtoUtil {
return builder.build();
}
+ public static GetLogSizeReplyProto toGetLogSizeReplyProto(long size,
Throwable t) {
+ GetLogSizeReplyProto.Builder builder = GetLogSizeReplyProto.newBuilder();
+ if (t != null) {
+ builder.setException(toLogException(t));
+ } else {
+ builder.setSize(size);
+ }
+ return builder.build();
+ }
+
public static GetLogStartIndexReplyProto toGetLogStartIndexReplyProto(long
length, Throwable t) {
GetLogStartIndexReplyProto.Builder builder =
GetLogStartIndexReplyProto.newBuilder();
if (t != null) {
diff --git a/ratis-logservice/src/main/proto/LogService.proto
b/ratis-logservice/src/main/proto/LogService.proto
index da15aad..2dc4a32 100644
--- a/ratis-logservice/src/main/proto/LogService.proto
+++ b/ratis-logservice/src/main/proto/LogService.proto
@@ -112,6 +112,17 @@ message GetLogLengthReplyProto {
LogServiceException exception = 2;
}
+// Get the size of a log (in bytes) request
+message GetLogSizeRequestProto {
+ LogNameProto logName = 1;
+}
+
+// Get the size of a log (in bytes) reply
+message GetLogSizeReplyProto {
+ uint64 size = 1;
+ LogServiceException exception = 2;
+}
+
message GetLogStartIndexRequestProto {
LogNameProto logName = 1;
}
@@ -142,6 +153,7 @@ message LogServiceRequestProto {
AppendLogEntryRequestProto appendRequest = 6;
SyncLogRequestProto syncRequest = 7;
GetLogLastCommittedIndexRequestProto lastIndexQuery = 8;
+ GetLogSizeRequestProto sizeRequest = 9;
}
}
diff --git
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index fd71311..8c08f44 100644
---
a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++
b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -76,6 +76,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends
MiniRaftCluster>
assertEquals("log1", logStream.getName().getName());
assertEquals(State.OPEN, logStream.getState());
assertEquals(0, logStream.getSize());
+ assertEquals(0, logStream.getLength());
LogReader reader = logStream.createReader();
LogWriter writer = logStream.createWriter();
@@ -88,10 +89,9 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
long id = writer.write(records);
LOG.info("id {}", id);
- // Check log size
- long size = logStream.getSize();
- assertEquals(10 * 100, size);
- LOG.info("size {}", size);
+ // Check log size and length
+ assertEquals(10 * 100, logStream.getSize());
+ assertEquals(10, logStream.getLength());
// Check last record id
long lastId2 = logStream.getLastRecordId();
@@ -100,7 +100,7 @@ public abstract class LogServiceReadWriteBase<CLUSTER
extends MiniRaftCluster>
// Check first record id
long startId = logStream.getStartRecordId();
LOG.info("start id {}", startId);
- //
+
reader.seek(lastId + 1);
// Read records back
List<ByteBuffer> data = reader.readBulk(1);