yandrey321 commented on code in PR #10365:
URL: https://github.com/apache/ozone/pull/10365#discussion_r3305755295
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java:
##########
@@ -685,6 +707,115 @@ private void
handleCommandResult(ContainerCommandRequestProto requestProto, long
}
}
+ /**
+ * Writes PutBlock metadata to RocksDB inside {@link
StateMachine.DataApi#write()}, before Raft quorum.
+ * This ensures every node that ACKs the log entry has already persisted the
block metadata,
+ * eliminating the gap where a follower crashes after the leader commits but
before the follower
+ * runs {@link #applyTransaction(TransactionContext)}.
+ *
+ * <p>Ordering: waits for all preceding WriteChunk {@code raftFuture}s to
complete before
+ * dispatching so that {@code finishWriteChunks()} inside {@link
KeyValueHandler#handlePutBlock}
+ * always sees fully written chunk files.
+ *
+ * <p>{@link #applyTransaction(TransactionContext)} becomes a no-op for
PutBlock because
+ * {@code persistPutBlock()} detects the already-written BCSID and returns
early.
+ */
+ private CompletableFuture<Message> writePutBlockData(
+ ContainerCommandRequestProto requestProto, long entryIndex, long term,
+ long startTime) {
+ final WriteFutures previous = writeChunkFutureMap.get(entryIndex);
+ if (previous != null) {
+ return previous.getRaftFuture();
+ }
+ try {
+ if
(ratisServer.getServer().getDivision(getGroupId()).getInfo().isLeader()) {
+ stateMachineDataCache.put(entryIndex, requestProto.toByteString());
+ }
+ } catch (InterruptedException ioe) {
+ Thread.currentThread().interrupt();
+ return completeExceptionally(ioe);
+ } catch (IOException ioe) {
+ return completeExceptionally(ioe);
+ }
+
+ // Snapshot preceding WriteChunk futures so finishWriteChunks() only runs
after chunk bytes are on disk.
+ final SortedMap<Long, WriteFutures> preceding =
writeChunkFutureMap.headMap(entryIndex, false);
+ final CompletableFuture<Void> precedingChunksDone = preceding.isEmpty()
+ ? CompletableFuture.completedFuture(null)
+ : CompletableFuture.allOf(preceding.values().stream()
+ .map(WriteFutures::getRaftFuture)
+ .toArray(CompletableFuture[]::new));
+
+ final DispatcherContext context = DispatcherContext
+ .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
+ .setTerm(term)
+ .setLogIndex(entryIndex)
+ .setContainer2BCSIDMap(container2BCSIDMap)
+ .build();
+
+ final CompletableFuture<Message> raftFuture = new CompletableFuture<>();
+
+ final CompletableFuture<ContainerCommandResponseProto> future =
containerTaskQueues.submit(
+ requestProto.getContainerID(),
+ () -> {
+ // Single try-finally ensures writeChunkFutureMap is always cleaned
up and
+ // raftFuture is always completed regardless of which code path
exits.
+ try {
+ try {
+ precedingChunksDone.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ StorageContainerException sce = new StorageContainerException(
+ "Interrupted waiting for preceding chunk writes at logIndex
" + entryIndex,
+ ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+ raftFuture.completeExceptionally(sce);
+ throw sce;
+ } catch (ExecutionException e) {
+ // A preceding write (possibly for a different container)
failed; the pipeline
+ // is already closing. Fail this entry so Ratis does not hang
waiting for the ACK.
+ StorageContainerException sce = new StorageContainerException(
+ "Preceding write failed at logIndex " + entryIndex + ": " +
e.getMessage(),
+ ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
+ raftFuture.completeExceptionally(sce);
+ throw sce;
+ }
+ ContainerCommandResponseProto result =
dispatchCommand(requestProto, context);
+ if (result.getResult() != ContainerProtos.Result.SUCCESS
+ && result.getResult() !=
ContainerProtos.Result.CONTAINER_NOT_OPEN
+ && result.getResult() !=
ContainerProtos.Result.CLOSED_CONTAINER_IO) {
+ StorageContainerException sce =
+ new StorageContainerException(result.getMessage(),
result.getResult());
+ LOG.error("{}: writePutBlockData failed at logIndex {}
containerID={}: {} {}",
+ getGroupId(), entryIndex, requestProto.getContainerID(),
+ result.getMessage(), result.getResult());
+ stateMachineHealthy.set(false);
+ raftFuture.completeExceptionally(sce);
+ } else {
+ raftFuture.complete(result::toByteString);
+ }
+ return result;
+ } catch (Exception e) {
+ if (!raftFuture.isDone()) {
+ LOG.error("{}: writePutBlockData failed at logIndex {}
containerID={}",
+ getGroupId(), entryIndex, requestProto.getContainerID(), e);
+ stateMachineHealthy.set(false);
+ raftFuture.completeExceptionally(e);
+ }
+ throw e;
+ } finally {
+ writeChunkFutureMap.remove(entryIndex);
+ }
+ },
+ executor);
+
+ writeChunkFutureMap.put(entryIndex, new WriteFutures(future, raftFuture,
startTime));
+ if (LOG.isDebugEnabled()) {
Review Comment:
there are no expensive calls or string formatting for LOG.debug() call, so
it would check the log level inside and shortcircuit the call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]