This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2139367b52 HDDS-11581. Remove duplicate
ContainerStateMachine#RaftGroupId (#7312)
2139367b52 is described below
commit 2139367b52d68773f25d79abce0439323dde9671
Author: jianghuazhu <[email protected]>
AuthorDate: Tue Oct 15 07:46:13 2024 +0800
HDDS-11581. Remove duplicate ContainerStateMachine#RaftGroupId (#7312)
---
.../server/ratis/ContainerStateMachine.java | 76 +++++++++++-----------
1 file changed, 37 insertions(+), 39 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 3a48ecaee5..1048ec5092 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -183,7 +183,6 @@ public class ContainerStateMachine extends BaseStateMachine
{
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
- private final RaftGroupId gid;
private final ContainerDispatcher dispatcher;
private final ContainerController containerController;
private final XceiverServerRatis ratisServer;
@@ -218,7 +217,6 @@ public class ContainerStateMachine extends BaseStateMachine
{
ConfigurationSource conf,
String threadNamePrefix) {
this.datanodeService = hddsDatanodeService;
- this.gid = gid;
this.dispatcher = dispatcher;
this.containerController = containerController;
this.ratisServer = ratisServer;
@@ -282,7 +280,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
throws IOException {
super.initialize(server, id, raftStorage);
storage.init(raftStorage);
- ratisServer.notifyGroupAdd(gid);
+ ratisServer.notifyGroupAdd(id);
LOG.info("{}: initialize {}", server.getId(), id);
loadSnapshot(storage.getLatestSnapshot());
@@ -293,7 +291,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
if (snapshot == null) {
TermIndex empty = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX);
LOG.info("{}: The snapshot info is null. Setting the last applied index
" +
- "to:{}", gid, empty);
+ "to:{}", getGroupId(), empty);
setLastAppliedTermIndex(empty);
return empty.getIndex();
}
@@ -301,7 +299,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
final File snapshotFile = snapshot.getFile().getPath().toFile();
final TermIndex last =
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
- LOG.info("{}: Setting the last applied index to {}", gid, last);
+ LOG.info("{}: Setting the last applied index to {}", getGroupId(), last);
setLastAppliedTermIndex(last);
// initialize the dispatcher with snapshot so that it build the missing
@@ -351,7 +349,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
long startTime = Time.monotonicNow();
if (!isStateMachineHealthy()) {
String msg =
- "Failed to take snapshot " + " for " + gid + " as the stateMachine"
+ "Failed to take snapshot " + " for " + getGroupId() + " as the
stateMachine"
+ " is unhealthy. The last applied index is at " + ti;
StateMachineException sme = new StateMachineException(msg);
LOG.error(msg);
@@ -360,19 +358,19 @@ public class ContainerStateMachine extends
BaseStateMachine {
if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
- LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
+ LOG.info("{}: Taking a snapshot at:{} file {}", getGroupId(), ti,
snapshotFile);
try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
persistContainerSet(fos);
fos.flush();
// make sure the snapshot file is synced
fos.getFD().sync();
} catch (IOException ioe) {
- LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
+ LOG.error("{}: Failed to write snapshot at:{} file {}", getGroupId(),
ti,
snapshotFile);
throw ioe;
}
LOG.info("{}: Finished taking a snapshot at:{} file:{} took: {} ms",
- gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
+ getGroupId(), ti, snapshotFile, (Time.monotonicNow() -
startTime));
return ti.getIndex();
}
return -1;
@@ -386,7 +384,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
final StateMachineLogEntryProto stateMachineLogEntry =
entry.getStateMachineLogEntry();
final ContainerCommandRequestProto logProto;
try {
- logProto = getContainerCommandRequestProto(gid,
stateMachineLogEntry.getLogData());
+ logProto = getContainerCommandRequestProto(getGroupId(),
stateMachineLogEntry.getLogData());
} catch (InvalidProtocolBufferException e) {
trx.setException(e);
return trx;
@@ -413,7 +411,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
message2ContainerCommandRequestProto(request.getMessage());
- Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
+ Preconditions.checkArgument(request.getRaftGroupId().equals(getGroupId()));
final TransactionContext.Builder builder = TransactionContext.newBuilder()
.setClientRequest(request)
@@ -449,7 +447,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
final WriteChunkRequestProto.Builder commitWriteChunkProto =
WriteChunkRequestProto.newBuilder(write)
.clearData();
protoBuilder.setWriteChunk(commitWriteChunkProto)
- .setPipelineID(gid.getUuid().toString())
+ .setPipelineID(getGroupId().getUuid().toString())
.setTraceID(proto.getTraceID());
builder.setStateMachineData(write.getData());
@@ -491,20 +489,20 @@ public class ContainerStateMachine extends
BaseStateMachine {
private ContainerCommandRequestProto message2ContainerCommandRequestProto(
Message message) throws InvalidProtocolBufferException {
- return ContainerCommandRequestMessage.toProto(message.getContent(), gid);
+ return ContainerCommandRequestMessage.toProto(message.getContent(),
getGroupId());
}
private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) {
if (LOG.isTraceEnabled()) {
- LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
+ LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}",
getGroupId(),
requestProto.getCmdType(), requestProto.getContainerID(),
requestProto.getPipelineID(), requestProto.getTraceID());
}
ContainerCommandResponseProto response =
dispatcher.dispatch(requestProto, context);
if (LOG.isTraceEnabled()) {
- LOG.trace("{}: response {}", gid, response);
+ LOG.trace("{}: response {}", getGroupId(), response);
}
return response;
}
@@ -531,7 +529,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
RaftServer server = ratisServer.getServer();
Preconditions.checkArgument(!write.getData().isEmpty());
try {
- if (server.getDivision(gid).getInfo().isLeader()) {
+ if (server.getDivision(getGroupId()).getInfo().isLeader()) {
stateMachineDataCache.put(entryIndex, write.getData());
}
} catch (InterruptedException ioe) {
@@ -559,7 +557,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
return dispatchCommand(requestProto, context);
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
- "{} logIndex {} chunkName {}", gid, write.getBlockID(),
+ "{} logIndex {} chunkName {}", getGroupId(),
write.getBlockID(),
entryIndex, write.getChunkData().getChunkName(), e);
metrics.incNumWriteDataFails();
// write chunks go in parallel. It's possible that one write chunk
@@ -573,7 +571,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
- "{} logIndex {} chunkName {}", gid, write.getBlockID(),
+ "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
entryIndex, write.getChunkData().getChunkName());
}
// Remove the future once it finishes execution from the
@@ -587,7 +585,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
&& r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY)
{
StorageContainerException sce =
new StorageContainerException(r.getMessage(), r.getResult());
- LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
+ LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed:
blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName() + " Error message: " +
r.getMessage() + " Container Result: " + r.getResult());
@@ -601,7 +599,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
metrics.incNumBytesWrittenCount(
requestProto.getWriteChunk().getChunkData().getLen());
if (LOG.isDebugEnabled()) {
- LOG.debug(gid +
+ LOG.debug(getGroupId() +
": writeChunk writeStateMachineData completed: blockId" +
write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
write.getChunkData().getChunkName());
@@ -622,7 +620,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
DispatcherContext context) throws StorageContainerException {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " +
- "traceID={}", gid, requestProto.getCmdType(),
+ "traceID={}", getGroupId(), requestProto.getCmdType(),
requestProto.getContainerID(), requestProto.getPipelineID(),
requestProto.getTraceID());
}
@@ -781,7 +779,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
new StorageContainerException(response.getMessage(),
response.getResult());
LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
- + "{} Container Result: {}", gid, response.getCmdType(), index,
+ + "{} Container Result: {}", getGroupId(),
response.getCmdType(), index,
response.getMessage(), response.getResult());
stateMachineHealthy.set(false);
throw sce;
@@ -856,7 +854,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
.map(TransactionContext::getStateMachineContext)
.orElse(null);
final ContainerCommandRequestProto requestProto = context != null ?
context.getLogProto()
- : getContainerCommandRequestProto(gid,
entry.getStateMachineLogEntry().getLogData());
+ : getContainerCommandRequestProto(getGroupId(),
entry.getStateMachineLogEntry().getLogData());
if (requestProto.getCmdType() != Type.WriteChunk) {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
@@ -874,7 +872,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
return future;
} catch (Exception e) {
metrics.incNumReadStateMachineFails();
- LOG.error("{} unable to read stateMachineData:", gid, e);
+ LOG.error("{} unable to read stateMachineData:", getGroupId(), e);
return completeExceptionally(e);
}
}
@@ -920,7 +918,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
// from `HddsDatanodeService.stop()`, otherwise, it indicates this `close`
originates from ratis.
if (allServer) {
if (datanodeService != null && !datanodeService.isStopped()) {
- LOG.info("{} is closed by ratis", gid);
+ LOG.info("{} is closed by ratis", getGroupId());
if (semaphore.tryAcquire()) {
// run with a different thread, so this raft group can be closed
Runnable runnable = () -> {
@@ -952,7 +950,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
CompletableFuture.runAsync(runnable);
}
} else {
- LOG.info("{} is closed by HddsDatanodeService", gid);
+ LOG.info("{} is closed by HddsDatanodeService", getGroupId());
}
}
}
@@ -983,7 +981,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
private void removeStateMachineDataIfNeeded(long index) {
if (waitOnBothFollowers) {
try {
- RaftServer.Division division =
ratisServer.getServer().getDivision(gid);
+ RaftServer.Division division =
ratisServer.getServer().getDivision(getGroupId());
if (division.getInfo().isLeader()) {
long minIndex = Arrays.stream(division.getInfo()
.getFollowerNextIndices()).min().getAsLong();
@@ -1041,7 +1039,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
final Consumer<Throwable> exceptionHandler = e -> {
- LOG.error(gid + ": failed to applyTransaction at logIndex " + index
+ LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " +
index
+ " for " + requestProto.getCmdType(), e);
stateMachineHealthy.compareAndSet(true, false);
metrics.incNumApplyTransactionsFails();
@@ -1069,7 +1067,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
new StorageContainerException(r.getMessage(), r.getResult());
LOG.error(
"gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
- + "{} Container Result: {}", gid, r.getCmdType(), index,
+ + "{} Container Result: {}", getGroupId(), r.getCmdType(),
index,
r.getMessage(), r.getResult());
metrics.incNumApplyTransactionsFails();
// Since the applyTransaction now is completed exceptionally,
@@ -1078,12 +1076,12 @@ public class ContainerStateMachine extends
BaseStateMachine {
// shutdown.
applyTransactionFuture.completeExceptionally(sce);
stateMachineHealthy.compareAndSet(true, false);
- ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
+ ratisServer.handleApplyTransactionFailure(getGroupId(),
trx.getServerRole());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"gid {} : ApplyTransaction completed. cmd {} logIndex {} msg :
"
- + "{} Container Result: {}", gid, r.getCmdType(), index,
+ + "{} Container Result: {}", getGroupId(), r.getCmdType(),
index,
r.getMessage(), r.getResult());
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
@@ -1161,25 +1159,25 @@ public class ContainerStateMachine extends
BaseStateMachine {
@Override
public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer
follower) {
- ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
+ ratisServer.handleFollowerSlowness(getGroupId(), roleInfoProto, follower);
}
@Override
public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
- ratisServer.handleNoLeader(gid, roleInfoProto);
+ ratisServer.handleNoLeader(getGroupId(), roleInfoProto);
}
@Override
public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
- LOG.error("{}: {} {}", gid, TermIndex.valueOf(failedEntry),
+ LOG.error("{}: {} {}", getGroupId(), TermIndex.valueOf(failedEntry),
toStateMachineLogEntryString(failedEntry.getStateMachineLogEntry()),
t);
- ratisServer.handleNodeLogFailure(gid, t);
+ ratisServer.handleNodeLogFailure(getGroupId(), t);
}
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
- ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
+ ratisServer.handleInstallSnapshotFromLeader(getGroupId(), roleInfoProto,
firstTermIndexInLog);
final CompletableFuture<TermIndex> future = new CompletableFuture<>();
future.complete(firstTermIndexInLog);
@@ -1188,7 +1186,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
@Override
public void notifyGroupRemove() {
- ratisServer.notifyGroupRemove(gid);
+ ratisServer.notifyGroupRemove(getGroupId());
// Make best effort to quasi-close all the containers on group removal.
// Containers already in terminal state like CLOSED or UNHEALTHY will not
// be affected.
@@ -1196,7 +1194,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
try {
containerController.markContainerForClose(cid);
containerController.quasiCloseContainer(cid,
- "Ratis group removed. Group id: " + gid);
+ "Ratis group removed. Group id: " + getGroupId());
} catch (IOException e) {
LOG.debug("Failed to quasi-close container {}", cid);
}
@@ -1218,7 +1216,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
@Override
public String toStateMachineLogEntryString(StateMachineLogEntryProto proto) {
- return smProtoToString(gid, containerController, proto);
+ return smProtoToString(getGroupId(), containerController, proto);
}
public static String smProtoToString(RaftGroupId gid,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]