This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d65ca26  RATIS-1366. Fix NPE issues in MetaStateMachine (#471)
d65ca26 is described below

commit d65ca26a0291fc6067f860eff4ff3092d25c0aec
Author: Roni Juntunen <[email protected]>
AuthorDate: Fri May 14 05:03:10 2021 +0300

    RATIS-1366. Fix NPE issues in MetaStateMachine (#471)
---
 .../ratis/logservice/server/MetaStateMachine.java  | 70 ++++++++++++----------
 .../apache/ratis/statemachine/StateMachine.java    |  3 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 13 ++--
 .../ratis/statemachine/impl/BaseStateMachine.java  |  3 +-
 4 files changed, 48 insertions(+), 41 deletions(-)

diff --git 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index f122b43..b88a048 100644
--- 
a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ 
b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -128,14 +128,14 @@ public class MetaStateMachine extends BaseStateMachine {
     }
 
     @Override
-    @SuppressFBWarnings("NP_NULL_ON_SOME_PATH")
-    public TransactionContext applyTransactionSerial(TransactionContext trx) {
+    public TransactionContext applyTransactionSerial(TransactionContext trx) 
throws InvalidProtocolBufferException {
         RaftProtos.LogEntryProto x = trx.getLogEntry();
         MetaSMRequestProto req = null;
         try {
             req = 
MetaSMRequestProto.parseFrom(x.getStateMachineLogEntry().getLogData());
         } catch (InvalidProtocolBufferException e) {
             e.printStackTrace();
+            throw e;
         }
         switch (req.getTypeCase()) {
             case REGISTERREQUEST:
@@ -202,48 +202,54 @@ public class MetaStateMachine extends BaseStateMachine {
         return super.applyTransaction(trx);
     }
 
+
     @Override
-    @SuppressFBWarnings("NP_NULL_ON_SOME_PATH")
     public CompletableFuture<Message> query(Message request) {
         Timer.Context timerContext = null;
         MetaServiceProtos.MetaServiceRequestProto.TypeCase type = null;
-        try {
-            if (currentGroup == null) {
-                try {
-                    List<RaftGroup> x =
+        if (currentGroup == null) {
+            try {
+                List<RaftGroup> x =
                         
StreamSupport.stream(raftServer.getGroups().spliterator(), false)
-                            .filter(group -> 
group.getGroupId().equals(metadataGroupId)).collect(Collectors.toList());
-                    if (x.size() == 1) {
-                        currentGroup = x.get(0);
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
+                          .filter(group -> 
group.getGroupId().equals(metadataGroupId)).collect(Collectors.toList());
+                if (x.size() == 1) {
+                    currentGroup = x.get(0);
                 }
-            }
-
-            MetaServiceProtos.MetaServiceRequestProto req = null;
-            try {
-                req = 
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
-            } catch (InvalidProtocolBufferException e) {
+            } catch (IOException e) {
                 e.printStackTrace();
             }
-            type = req.getTypeCase();
+        }
+
+        MetaServiceProtos.MetaServiceRequestProto req;
+        try {
+            req = 
MetaServiceProtos.MetaServiceRequestProto.parseFrom(request.getContent());
+        } catch (InvalidProtocolBufferException e) {
+            e.printStackTrace();
+            return null;
+        }
+        type = req.getTypeCase();
+        // Main purpose of this try catch block is to make sure that
+        // timerContext.stop() is run after return.
+        try {
             timerContext = 
logServiceMetaDataMetrics.getTimer(type.name()).time();
             switch (type) {
 
-            case CREATELOG:
-                return processCreateLogRequest(req);
-            case LISTLOGS:
-                return processListLogsRequest();
-            case GETLOG:
-                return processGetLogRequest(req);
-            case DELETELOG:
-                return processDeleteLog(req);
-            default:
+                case CREATELOG:
+                    return processCreateLogRequest(req);
+                case LISTLOGS:
+                    return processListLogsRequest();
+                case GETLOG:
+                    return processGetLogRequest(req);
+                case DELETELOG:
+                    return processDeleteLog(req);
+                default:
+                    CompletableFuture<Message> reply = super.query(request);
+                    return reply;
             }
-            CompletableFuture<Message> reply = super.query(request);
-            return reply;
-        }finally{
+        } catch (Exception e) {
+            LOG.error("Exception during Meta State Machine query");
+            throw e;
+        } finally {
             if (timerContext != null) {
                 timerContext.stop();
             }
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 87e5a83..c0a8916 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -32,6 +32,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
@@ -444,7 +445,7 @@ public interface StateMachine extends Closeable {
    *            of the raft peers
    * @return The Transaction context.
    */
-  TransactionContext applyTransactionSerial(TransactionContext trx);
+  TransactionContext applyTransactionSerial(TransactionContext trx) throws 
InvalidProtocolBufferException;
 
   /**
    * Apply a committed log entry to the state machine. This method is called 
sequentially in
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index ccead21..483d0e1 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -53,6 +53,7 @@ import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.storage.RaftStorageDirectory;
 import org.apache.ratis.server.util.ServerStringUtils;
@@ -1663,7 +1664,7 @@ class RaftServerImpl implements RaftServer.Division,
     });
   }
 
-  CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
+  CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws 
RaftLogIOException {
     if (!next.hasStateMachineLogEntry()) {
       stateMachine.event().notifyTermIndexUpdated(next.getTerm(), 
next.getIndex());
     }
@@ -1683,16 +1684,14 @@ class RaftServerImpl implements RaftServer.Division,
                   .setLogEntry(next)
                   .build());
 
-      // Let the StateMachine inject logic for committed transactions in 
sequential order.
-      trx = stateMachine.applyTransactionSerial(trx);
-
       try {
+        // Let the StateMachine inject logic for committed transactions in 
sequential order.
+        trx = stateMachine.applyTransactionSerial(trx);
+
         final CompletableFuture<Message> stateMachineFuture = 
stateMachine.applyTransaction(trx);
         return replyPendingRequest(next, stateMachineFuture);
       } catch (Exception e) {
-        LOG.error("{}: applyTransaction failed for index:{} proto:{}",
-            getMemberId(), next.getIndex(), 
LogProtoUtils.toLogEntryString(next), e);
-        throw e;
+        throw new RaftLogIOException(e);
       }
     }
     return null;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 9e0f5c5..2dd41ee 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -34,6 +34,7 @@ import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Preconditions;
@@ -104,7 +105,7 @@ public class BaseStateMachine implements StateMachine, 
StateMachine.DataApi,
   }
 
   @Override
-  public TransactionContext applyTransactionSerial(TransactionContext trx) {
+  public TransactionContext applyTransactionSerial(TransactionContext trx) 
throws InvalidProtocolBufferException {
     return trx;
   }
 

Reply via email to