This is an automated email from the ASF dual-hosted git repository.
aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 91cc197 HDDS-1757. Use ExecutorService in OzoneManagerStateMachine.
(#1048)
91cc197 is described below
commit 91cc19722796877f134fd04f60229ac47a1bd6e0
Author: Bharat Viswanadham <[email protected]>
AuthorDate: Tue Jul 2 16:02:27 2019 -0700
HDDS-1757. Use ExecutorService in OzoneManagerStateMachine. (#1048)
---
.../ozone/om/ratis/OzoneManagerStateMachine.java | 43 +++++++++++++++++++++-
1 file changed, 41 insertions(+), 2 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 1bd5a70..d9eb0b9 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -19,11 +19,16 @@ package org.apache.hadoop.ozone.om.ratis;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -38,6 +43,7 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandler;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerHARequestHandlerImpl;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -70,6 +76,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
private RaftGroupId raftGroupId;
private long lastAppliedIndex = 0;
private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
+ private final ExecutorService executorService;
public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
this.omRatisServer = ratisServer;
@@ -79,6 +86,9 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
this::updateLastAppliedIndex);
this.handler = new OzoneManagerHARequestHandlerImpl(ozoneManager,
ozoneManagerDoubleBuffer);
+ ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("OM StateMachine ApplyTransaction Thread - %d").build();
+ this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
}
/**
@@ -132,8 +142,36 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
trx.getStateMachineLogEntry().getLogData());
long trxLogIndex = trx.getLogEntry().getIndex();
- CompletableFuture<Message> future = CompletableFuture
- .supplyAsync(() -> runCommand(request, trxLogIndex));
+ // In the current approach we have one single global thread executor.
+ // with single thread. Right now this is being done for correctness, as
+ // applyTransaction will be run on multiple OM's we want to execute the
+ // transactions in the same order on all OM's, otherwise there is a
+ // chance that OM replica's can be out of sync.
+ // TODO: In this way we are making all applyTransactions in
+ // OM serial order. Revisit this in future to use multiple executors for
+ // volume/bucket.
+
+ // Reason for not immediately implementing executor per volume is, if
+ // one executor operations are slow, we cannot update the
+ // lastAppliedIndex in OzoneManager StateMachine, even if other
+ // executor has completed the transactions with id more.
+
+ // We have 300 transactions, And for each volume we have transactions
+ // of 150. Volume1 transactions 0 - 149 and Volume2 transactions 150 -
+ // 299.
+ // Example: Executor1 - Volume1 - 100 (current completed transaction)
+ // Example: Executor2 - Volume2 - 299 (current completed transaction)
+
+ // Now we have applied transactions of 0 - 100 and 149 - 299. We
+ // cannot update lastAppliedIndex to 299. We need to update it to 100,
+ // since 101 - 149 are not applied. When OM restarts it will
+ // applyTransactions from lastAppliedIndex.
+ // We can update the lastAppliedIndex to 100, and update it to 299,
+ // only after completing 101 - 149. In initial stage, we are starting
+ // with single global executor. Will revisit this when needed.
+
+ CompletableFuture<Message> future = CompletableFuture.supplyAsync(
+ () -> runCommand(request, trxLogIndex), executorService);
return future;
} catch (IOException e) {
return completeExceptionally(e);
@@ -301,6 +339,7 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
public void stop() {
ozoneManagerDoubleBuffer.stop();
+ HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]