Repository: hadoop Updated Branches: refs/heads/ozone-0.3 27bbbd0f9 -> d37705c0f
HDDS-716. Update ozone to latest ratis snapshot build(0.3.0-aa38160-SNAPSHOT). Contributed by Mukul Kumar Singh. (cherry picked from commit 0891cdda7961f7d0d7debdb8e89b7816f39f7c7b) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d37705c0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d37705c0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d37705c0 Branch: refs/heads/ozone-0.3 Commit: d37705c0f153a14ecdcc94b0727607c33a15e662 Parents: 27bbbd0 Author: Shashikant Banerjee <[email protected]> Authored: Wed Oct 24 16:04:57 2018 +0530 Committer: Arpit Agarwal <[email protected]> Committed: Wed Oct 24 08:34:34 2018 -0700 ---------------------------------------------------------------------- .../server/ratis/ContainerStateMachine.java | 72 +++++++++++++------- hadoop-project/pom.xml | 2 +- 2 files changed, 48 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37705c0/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index fa9fbf3..bcbf93f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.ratis.proto.RaftProtos.StateMachineEntryProto; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; @@ -49,7 +50,7 @@ import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -207,7 +208,7 @@ public class ContainerStateMachine extends BaseStateMachine { final ContainerCommandRequestProto proto = getRequestProto(request.getMessage().getContent()); - final SMLogEntryProto log; + final StateMachineLogEntryProto log; if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); // create the state machine data proto @@ -237,23 +238,39 @@ public class ContainerStateMachine extends BaseStateMachine { .setWriteChunk(commitWriteChunkProto) .build(); - log = SMLogEntryProto.newBuilder() - .setData(commitContainerCommandProto.toByteString()) - .setStateMachineData(dataContainerCommandProto.toByteString()) - .build(); + log = createSMLogEntryProto(request, + commitContainerCommandProto.toByteString(), + dataContainerCommandProto.toByteString()); } else if (proto.getCmdType() == Type.CreateContainer) { - log = SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .setStateMachineData(request.getMessage().getContent()) - .build(); + log = createSMLogEntryProto(request, + request.getMessage().getContent(), request.getMessage().getContent()); } else { - log = SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build(); + log = createSMLogEntryProto(request, request.getMessage().getContent(), + null); } return new TransactionContextImpl(this, request, log); } + private StateMachineLogEntryProto createSMLogEntryProto(RaftClientRequest r, + ByteString logData, ByteString smData) { + StateMachineLogEntryProto.Builder builder = + StateMachineLogEntryProto.newBuilder(); + + builder.setCallId(r.getCallId()) + .setClientId(r.getClientId().toByteString()) + .setLogData(logData); + + if (smData != null) { + builder.setStateMachineEntry(StateMachineEntryProto.newBuilder() + .setStateMachineData(smData).build()); + } + return builder.build(); + } + + private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) { + return entryProto.getStateMachineEntry().getStateMachineData(); + } + private ContainerCommandRequestProto getRequestProto(ByteString request) throws InvalidProtocolBufferException { return ContainerCommandRequestProto.parseFrom(request); @@ -315,7 +332,7 @@ public class ContainerStateMachine extends BaseStateMachine { try { metrics.incNumWriteStateMachineOps(); final ContainerCommandRequestProto requestProto = - getRequestProto(entry.getSmLogEntry().getStateMachineData()); + getRequestProto(getStateMachineData(entry.getStateMachineLogEntry())); Type cmdType = requestProto.getCmdType(); switch (cmdType) { case CreateContainer: @@ -345,8 +362,8 @@ public class ContainerStateMachine extends BaseStateMachine { } } - private ByteString readStateMachineData(LogEntryProto entry, - ContainerCommandRequestProto requestProto) { + private ByteString readStateMachineData(ContainerCommandRequestProto + requestProto) { WriteChunkRequestProto writeChunkRequestProto = requestProto.getWriteChunk(); // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is @@ -361,7 +378,8 @@ public class ContainerStateMachine extends BaseStateMachine { .setChunkData(writeChunkRequestProto.getChunkData()); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto.newBuilder(requestProto) - .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto) + .setCmdType(Type.ReadChunk) + .setReadChunk(readChunkRequestProto) .build(); // read the chunk @@ -376,7 +394,8 @@ public class ContainerStateMachine extends BaseStateMachine { final WriteChunkRequestProto.Builder dataWriteChunkProto = WriteChunkRequestProto.newBuilder(writeChunkRequestProto) // adding the state machine data - .setData(responseProto.getData()).setStage(Stage.WRITE_DATA); + .setData(responseProto.getData()) + .setStage(Stage.WRITE_DATA); ContainerCommandRequestProto.Builder newStateMachineProto = ContainerCommandRequestProto.newBuilder(requestProto) @@ -410,21 +429,20 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public CompletableFuture<ByteString> readStateMachineData( LogEntryProto entry) { - SMLogEntryProto smLogEntryProto = entry.getSmLogEntry(); - if (!smLogEntryProto.getStateMachineData().isEmpty()) { + StateMachineLogEntryProto smLogEntryProto = entry.getStateMachineLogEntry(); + if (!getStateMachineData(smLogEntryProto).isEmpty()) { return CompletableFuture.completedFuture(ByteString.EMPTY); } try { final ContainerCommandRequestProto requestProto = - getRequestProto(entry.getSmLogEntry().getData()); + getRequestProto(entry.getStateMachineLogEntry().getLogData()); // readStateMachineData should only be called for "write" to Ratis. Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto)); if (requestProto.getCmdType() == Type.WriteChunk) { return CompletableFuture.supplyAsync(() -> - readStateMachineData(entry, requestProto), - chunkExecutor); + readStateMachineData(requestProto), chunkExecutor); } else if (requestProto.getCmdType() == Type.CreateContainer) { return CompletableFuture.completedFuture(requestProto.toByteString()); } else { @@ -462,7 +480,7 @@ public class ContainerStateMachine extends BaseStateMachine { try { metrics.incNumApplyTransactionsOps(); ContainerCommandRequestProto requestProto = - getRequestProto(trx.getSMLogEntry().getData()); + getRequestProto(trx.getStateMachineLogEntry().getLogData()); Type cmdType = requestProto.getCmdType(); CompletableFuture<Message> future; if (cmdType == Type.PutBlock) { @@ -490,6 +508,11 @@ public class ContainerStateMachine extends BaseStateMachine { .supplyAsync(() -> runCommand(containerCommandRequestProto), getCommandExecutor(requestProto)); } else { + // Make sure that in write chunk, the user data is not set + if (cmdType == Type.WriteChunk) { + Preconditions.checkArgument(requestProto + .getWriteChunk().getData().isEmpty()); + } future = CompletableFuture.supplyAsync(() -> runCommand(requestProto), getCommandExecutor(requestProto)); } @@ -534,7 +557,6 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public void close() throws IOException { - takeSnapshot(); for (int i = 0; i < numExecutors; i++) { executors[i].shutdown(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d37705c0/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index dae075f..22c3b35 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -101,7 +101,7 @@ <ldap-api.version>1.0.0-M33</ldap-api.version> <!-- Apache Ratis version --> - <ratis.version>0.3.0-9b2d7b6-SNAPSHOT</ratis.version> + <ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version> <jcache.version>1.0-alpha-1</jcache.version> <ehcache.version>3.3.1</ehcache.version> <hikari.version>2.4.12</hikari.version> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
