This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d65ca26 RATIS-1366. Fix NPE issues in MetaStateMachine (#471)
d65ca26 is described below
commit d65ca26a0291fc6067f860eff4ff3092d25c0aec
Author: Roni Juntunen <[email protected]>
AuthorDate: Fri May 14 05:03:10 2021 +0300
RATIS-1366. Fix NPE issues in MetaStateMachine (#471)
---
.../ratis/logservice/server/MetaStateMachine.java | 70 ++++++++++++----------
.../apache/ratis/statemachine/StateMachine.java | 3 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 13 ++--
.../ratis/statemachine/impl/BaseStateMachine.java | 3 +-
4 files changed, 48 insertions(+), 41 deletions(-)
diff --git
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index f122b43..b88a048 100644
---
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -128,14 +128,14 @@ public class MetaStateMachine extends BaseStateMachine {
}
@Override
- @SuppressFBWarnings("NP_NULL_ON_SOME_PATH")
- public TransactionContext applyTransactionSerial(TransactionContext trx) {
+ public TransactionContext applyTransactionSerial(TransactionContext trx)
throws InvalidProtocolBufferException {
RaftProtos.LogEntryProto x = trx.getLogEntry();
MetaSMRequestProto req = null;
try {
req =
MetaSMRequestProto.parseFrom(x.getStateMachineLogEntry().getLogData());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
+ throw e;
}
switch (req.getTypeCase()) {
case REGISTERREQUEST:
@@ -202,48 +202,54 @@ public class MetaStateMachine extends BaseStateMachine {
return super.applyTransaction(trx);
}
+
@Override
- @SuppressFBWarnings("NP_NULL_ON_SOME_PATH")
public CompletableFuture<Message> query(Message request) {
Timer.Context timerContext = null;
MetaServiceProtos.MetaServiceRequestProto.TypeCase type = null;
- try {
- if (currentGroup == null) {
- try {
- List<RaftGroup> x =
+ if (currentGroup == null) {
+ try {
+ List<RaftGroup> x =
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
- .filter(group ->
group.getGroupId().equals(metadataGroupId)).collect(Collectors.toList());
- if (x.size() == 1) {
- currentGroup = x.get(0);
- }
- } catch (IOException e) {
- e.printStackTrace();
+ .filter(group ->
group.getGroupId().equals(metadataGroupId)).collect(Collectors.toList());
+ if (x.size() == 1) {
+ currentGroup = x.get(0);
}
- }
-
- MetaServiceProtos.MetaServiceRequestProto req = null;
- try {
- req =
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
- } catch (InvalidProtocolBufferException e) {
+ } catch (IOException e) {
e.printStackTrace();
}
- type = req.getTypeCase();
+ }
+
+ MetaServiceProtos.MetaServiceRequestProto req;
+ try {
+ req =
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
+ } catch (InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ return null;
+ }
+ type = req.getTypeCase();
+ // Main purpose of this try catch block is to make sure that
+ // timerContext.stop() is run after return.
+ try {
timerContext =
logServiceMetaDataMetrics.getTimer(type.name()).time();
switch (type) {
- case CREATELOG:
- return processCreateLogRequest(req);
- case LISTLOGS:
- return processListLogsRequest();
- case GETLOG:
- return processGetLogRequest(req);
- case DELETELOG:
- return processDeleteLog(req);
- default:
+ case CREATELOG:
+ return processCreateLogRequest(req);
+ case LISTLOGS:
+ return processListLogsRequest();
+ case GETLOG:
+ return processGetLogRequest(req);
+ case DELETELOG:
+ return processDeleteLog(req);
+ default:
+ CompletableFuture<Message> reply = super.query(request);
+ return reply;
}
- CompletableFuture<Message> reply = super.query(request);
- return reply;
- }finally{
+ } catch (Exception e) {
+ LOG.error("Exception during Meta State Machine query");
+ throw e;
+ } finally {
if (timerContext != null) {
timerContext.stop();
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 87e5a83..c0a8916 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
@@ -444,7 +445,7 @@ public interface StateMachine extends Closeable {
* of the raft peers
* @return The Transaction context.
*/
- TransactionContext applyTransactionSerial(TransactionContext trx);
+ TransactionContext applyTransactionSerial(TransactionContext trx) throws
InvalidProtocolBufferException;
/**
* Apply a committed log entry to the state machine. This method is called
sequentially in
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ccead21..483d0e1 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -53,6 +53,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.server.util.ServerStringUtils;
@@ -1663,7 +1664,7 @@ class RaftServerImpl implements RaftServer.Division,
});
}
- CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
+ CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws
RaftLogIOException {
if (!next.hasStateMachineLogEntry()) {
stateMachine.event().notifyTermIndexUpdated(next.getTerm(),
next.getIndex());
}
@@ -1683,16 +1684,14 @@ class RaftServerImpl implements RaftServer.Division,
.setLogEntry(next)
.build());
- // Let the StateMachine inject logic for committed transactions in
sequential order.
- trx = stateMachine.applyTransactionSerial(trx);
-
try {
+ // Let the StateMachine inject logic for committed transactions in
sequential order.
+ trx = stateMachine.applyTransactionSerial(trx);
+
final CompletableFuture<Message> stateMachineFuture =
stateMachine.applyTransaction(trx);
return replyPendingRequest(next, stateMachineFuture);
} catch (Exception e) {
- LOG.error("{}: applyTransaction failed for index:{} proto:{}",
- getMemberId(), next.getIndex(),
LogProtoUtils.toLogEntryString(next), e);
- throw e;
+ throw new RaftLogIOException(e);
}
}
return null;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 9e0f5c5..2dd41ee 100644
---
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -34,6 +34,7 @@ import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
+import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
@@ -104,7 +105,7 @@ public class BaseStateMachine implements StateMachine,
StateMachine.DataApi,
}
@Override
- public TransactionContext applyTransactionSerial(TransactionContext trx) {
+ public TransactionContext applyTransactionSerial(TransactionContext trx)
throws InvalidProtocolBufferException {
return trx;
}