This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new 9765a6a134 HDDS-11253. Handle corrupted merkle tree files (#7083)
9765a6a134 is described below

commit 9765a6a1347eb88c93f6423b8d6b4d9e6d55c115
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Sep 10 13:37:06 2024 -0400

    HDDS-11253. Handle corrupted merkle tree files (#7083)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   1 +
 .../ContainerCommandResponseBuilders.java          |  10 +-
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  23 +-
 .../org/apache/hadoop/ozone/audit/DNAction.java    |   2 +-
 .../checksum/ContainerChecksumTreeManager.java     | 154 +++++++------
 .../checksum/DNContainerOperationClient.java       |  27 ++-
 .../container/common/impl/HddsDispatcher.java      |   2 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  27 ++-
 .../checksum/ContainerMerkleTreeTestUtils.java     |  23 ++
 .../checksum/TestContainerChecksumTreeManager.java | 124 ++++++++---
 .../container/keyvalue/TestKeyValueHandler.java    |  34 +++
 .../src/main/proto/DatanodeClientProtocol.proto    |  12 +-
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |  10 +-
 .../TestContainerCommandReconciliation.java        | 248 ++++++++++++++-------
 14 files changed, 496 insertions(+), 201 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 794b972f15..ff0cef43c9 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -424,6 +424,7 @@ public final class HddsUtils {
     case ListContainer:
     case ListChunk:
     case GetCommittedBlockLength:
+    case GetContainerChecksumInfo:
       return true;
     case CloseContainer:
     case WriteChunk:
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index f3476b72e3..f0b483900d 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -371,14 +371,14 @@ public final class ContainerCommandResponseBuilders {
   }
 
   public static ContainerCommandResponseProto 
getGetContainerMerkleTreeResponse(
-      ContainerCommandRequestProto request, ByteString checksumTree) {
+      ContainerCommandRequestProto request, ByteString checksumInfo) {
 
-    ContainerProtos.GetContainerMerkleTreeResponseProto.Builder 
containerMerkleTree =
-        ContainerProtos.GetContainerMerkleTreeResponseProto.newBuilder()
+    ContainerProtos.GetContainerChecksumInfoResponseProto.Builder 
containerMerkleTree =
+        ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder()
             .setContainerID(request.getContainerID())
-            .setContainerMerkleTree(checksumTree);
+            .setContainerChecksumInfo(checksumInfo);
     return getSuccessResponseBuilder(request)
-        .setGetContainerMerkleTree(containerMerkleTree).build();
+        .setGetContainerChecksumInfo(containerMerkleTree).build();
   }
 
   private ContainerCommandResponseBuilders() {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 2a6ef2dea5..9b44051e06 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -768,15 +768,21 @@ public final class ContainerProtocolCalls  {
   }
 
   /**
-   * Gets the Container merkle tree for a container from a datanode.
+   * Gets the container checksum info for a container from a datanode. This 
method does not deserialize the checksum
+   * info.
+   *
    * @param client - client that communicates with the container
    * @param containerID - Container Id of the container
    * @param encodedContainerID - Encoded token if security is enabled
+   *
+   * @throws IOException For errors communicating with the datanode.
+   * @throws StorageContainerException For errors obtaining the checksum info, 
including the file being missing or
+   * empty on the datanode, or the datanode not having a replica of the 
container.
    */
-  public static ContainerProtos.GetContainerMerkleTreeResponseProto 
getContainerMerkleTree(
+  public static ContainerProtos.GetContainerChecksumInfoResponseProto 
getContainerChecksumInfo(
       XceiverClientSpi client, long containerID, String encodedContainerID) 
throws IOException {
-    ContainerProtos.GetContainerMerkleTreeRequestProto 
containerMerkleTreeRequestProto =
-        ContainerProtos.GetContainerMerkleTreeRequestProto
+    ContainerProtos.GetContainerChecksumInfoRequestProto 
containerChecksumRequestProto =
+        ContainerProtos.GetContainerChecksumInfoRequestProto
             .newBuilder()
             .setContainerID(containerID)
             .build();
@@ -784,10 +790,10 @@ public final class ContainerProtocolCalls  {
 
     ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
         .newBuilder()
-        .setCmdType(Type.GetContainerMerkleTree)
+        .setCmdType(Type.GetContainerChecksumInfo)
         .setContainerID(containerID)
         .setDatanodeUuid(id)
-        .setGetContainerMerkleTree(containerMerkleTreeRequestProto);
+        .setGetContainerChecksumInfo(containerChecksumRequestProto);
     if (encodedContainerID != null) {
       builder.setEncodedToken(encodedContainerID);
     }
@@ -796,9 +802,8 @@ public final class ContainerProtocolCalls  {
       builder.setTraceID(traceId);
     }
     ContainerCommandRequestProto request = builder.build();
-    ContainerCommandResponseProto response =
-        client.sendCommand(request, getValidatorList());
-    return response.getGetContainerMerkleTree();
+    ContainerCommandResponseProto response = client.sendCommand(request, 
getValidatorList());
+    return response.getGetContainerChecksumInfo();
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index be594062d0..b4e1687c28 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -42,7 +42,7 @@ public enum DNAction implements AuditAction {
   STREAM_INIT,
   FINALIZE_BLOCK,
   ECHO,
-  GET_CONTAINER_MERKLE_TREE;
+  GET_CONTAINER_CHECKSUM_INFO;
 
   @Override
   public String getAction() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
index 4902878d74..769515d96e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -29,18 +29,22 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
 import java.util.Collection;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.util.concurrent.Striped;
 import org.apache.hadoop.hdds.utils.SimpleStriped;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
 import static org.apache.hadoop.ozone.util.MetricUtil.captureLatencyNs;
 
 /**
@@ -50,17 +54,18 @@ public class ContainerChecksumTreeManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerChecksumTreeManager.class);
 
-  // Used to coordinate reads and writes to each container's checksum file.
+  // Used to coordinate writes to each container's checksum file.
   // Each container ID is mapped to a stripe.
-  private final Striped<ReadWriteLock> fileLock;
+  // The file is atomically renamed into place, so readers do not need 
coordination.
+  private final Striped<Lock> fileLock;
   private final ContainerMerkleTreeMetrics metrics;
 
   /**
    * Creates one instance that should be used to coordinate all container 
checksum info within a datanode.
    */
   public ContainerChecksumTreeManager(ConfigurationSource conf) {
-    fileLock = SimpleStriped.readWriteLock(
-        
conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(), 
true);
+    fileLock = 
SimpleStriped.custom(conf.getObject(DatanodeConfiguration.class).getContainerChecksumLockStripes(),
+        () -> new ReentrantLock(true));
     metrics = ContainerMerkleTreeMetrics.create();
   }
 
@@ -75,14 +80,26 @@ public class ContainerChecksumTreeManager {
    * Concurrent writes to the same file are coordinated internally.
    */
   public void writeContainerDataTree(ContainerData data, ContainerMerkleTree 
tree) throws IOException {
-    Lock writeLock = getWriteLock(data.getContainerID());
+    long containerID = data.getContainerID();
+    Lock writeLock = getLock(containerID);
     writeLock.lock();
     try {
-      ContainerProtos.ContainerChecksumInfo newChecksumInfo = 
read(data).toBuilder()
-          
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
 tree::toProto))
-          .build();
-      write(data, newChecksumInfo);
-      LOG.debug("Data merkle tree for container {} updated", 
data.getContainerID());
+      ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
+      try {
+        // If the file is not present, we will create the data for the first 
time. This happens under a write lock.
+        checksumInfoBuilder = read(data)
+            .orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
+      } catch (IOException ex) {
+        LOG.error("Failed to read container checksum tree file for container 
{}. Overwriting it with a new instance.",
+            containerID, ex);
+        checksumInfoBuilder = 
ContainerProtos.ContainerChecksumInfo.newBuilder();
+      }
+
+      checksumInfoBuilder
+          .setContainerID(containerID)
+          
.setContainerMerkleTree(captureLatencyNs(metrics.getCreateMerkleTreeLatencyNS(),
 tree::toProto));
+      write(data, checksumInfoBuilder.build());
+      LOG.debug("Data merkle tree for container {} updated", containerID);
     } finally {
       writeLock.unlock();
     }
@@ -94,21 +111,33 @@ public class ContainerChecksumTreeManager {
    * Concurrent writes to the same file are coordinated internally.
    */
   public void markBlocksAsDeleted(KeyValueContainerData data, Collection<Long> 
deletedBlockIDs) throws IOException {
-    Lock writeLock = getWriteLock(data.getContainerID());
+    long containerID = data.getContainerID();
+    Lock writeLock = getLock(containerID);
     writeLock.lock();
     try {
-      ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = 
read(data).toBuilder();
+      ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = null;
+      try {
+        // If the file is not present, we will create the data for the first 
time. This happens under a write lock.
+        checksumInfoBuilder = read(data)
+            .orElse(ContainerProtos.ContainerChecksumInfo.newBuilder());
+      } catch (IOException ex) {
+        LOG.error("Failed to read container checksum tree file for container 
{}. Overwriting it with a new instance.",
+            data.getContainerID(), ex);
+        checksumInfoBuilder = 
ContainerProtos.ContainerChecksumInfo.newBuilder();
+      }
+
       // Although the persisted block list should already be sorted, we will 
sort it here to make sure.
       // This will automatically fix any bugs in the persisted order that may 
show up.
       SortedSet<Long> sortedDeletedBlockIDs = new 
TreeSet<>(checksumInfoBuilder.getDeletedBlocksList());
       sortedDeletedBlockIDs.addAll(deletedBlockIDs);
 
       checksumInfoBuilder
+          .setContainerID(containerID)
           .clearDeletedBlocks()
-          .addAllDeletedBlocks(sortedDeletedBlockIDs)
-          .build();
+          .addAllDeletedBlocks(sortedDeletedBlockIDs);
       write(data, checksumInfoBuilder.build());
-      LOG.debug("Deleted block list for container {} updated", 
data.getContainerID());
+      LOG.debug("Deleted block list for container {} updated with {} new 
blocks", data.getContainerID(),
+          sortedDeletedBlockIDs.size());
     } finally {
       writeLock.unlock();
     }
@@ -125,83 +154,80 @@ public class ContainerChecksumTreeManager {
   /**
    * Returns the container checksum tree file for the specified container 
without deserializing it.
    */
+  @VisibleForTesting
   public static File getContainerChecksumFile(ContainerData data) {
     return new File(data.getMetadataPath(), data.getContainerID() + ".tree");
   }
 
-  private Lock getReadLock(long containerID) {
-    return fileLock.get(containerID).readLock();
+  @VisibleForTesting
+  public static File getTmpContainerChecksumFile(ContainerData data) {
+    return new File(data.getMetadataPath(), data.getContainerID() + 
".tree.tmp");
   }
 
-  private Lock getWriteLock(long containerID) {
-    return fileLock.get(containerID).writeLock();
+  private Lock getLock(long containerID) {
+    return fileLock.get(containerID);
   }
 
-  private ContainerProtos.ContainerChecksumInfo read(ContainerData data) 
throws IOException {
+  /**
+   * Callers are not required to hold a lock while calling this since writes 
are done to a tmp file and atomically
+   * swapped into place.
+   */
+  private Optional<ContainerProtos.ContainerChecksumInfo.Builder> 
read(ContainerData data) throws IOException {
     long containerID = data.getContainerID();
-    Lock readLock = getReadLock(containerID);
-    readLock.lock();
+    File checksumFile = getContainerChecksumFile(data);
     try {
-      File checksumFile = getContainerChecksumFile(data);
-      // If the checksum file has not been created yet, return an empty 
instance.
-      // Since all writes happen as part of an atomic read-modify-write cycle 
that requires a write lock, two empty
-      // instances for the same container obtained only under the read lock 
will not conflict.
       if (!checksumFile.exists()) {
-        LOG.debug("No checksum file currently exists for container {} at the 
path {}. Returning an empty instance.",
-            containerID, checksumFile);
-        return ContainerProtos.ContainerChecksumInfo.newBuilder()
-            .setContainerID(containerID)
-            .build();
+        LOG.debug("No checksum file currently exists for container {} at the 
path {}", containerID, checksumFile);
+        return Optional.empty();
       }
       try (FileInputStream inStream = new FileInputStream(checksumFile)) {
         return captureLatencyNs(metrics.getReadContainerMerkleTreeLatencyNS(),
-            () -> ContainerProtos.ContainerChecksumInfo.parseFrom(inStream));
+            () -> 
Optional.of(ContainerProtos.ContainerChecksumInfo.parseFrom(inStream).toBuilder()));
       }
     } catch (IOException ex) {
       metrics.incrementMerkleTreeReadFailures();
       throw new IOException("Error occurred when reading container merkle tree 
for containerID "
-              + data.getContainerID(), ex);
-    } finally {
-      readLock.unlock();
+              + data.getContainerID() + " at path " + checksumFile, ex);
     }
   }
 
+  /**
+   * Callers should have acquired the write lock before calling this method.
+   */
   private void write(ContainerData data, ContainerProtos.ContainerChecksumInfo 
checksumInfo) throws IOException {
-    Lock writeLock = getWriteLock(data.getContainerID());
-    writeLock.lock();
-    try (FileOutputStream outStream = new 
FileOutputStream(getContainerChecksumFile(data))) {
-      captureLatencyNs(metrics.getWriteContainerMerkleTreeLatencyNS(),
-          () -> checksumInfo.writeTo(outStream));
+    // Make sure callers filled in required fields before writing.
+    Preconditions.assertTrue(checksumInfo.hasContainerID());
+
+    File checksumFile = getContainerChecksumFile(data);
+    File tmpChecksumFile = getTmpContainerChecksumFile(data);
+
+    try (FileOutputStream tmpOutputStream = new 
FileOutputStream(tmpChecksumFile)) {
+      // Write to a tmp file and rename it into place.
+      captureLatencyNs(metrics.getWriteContainerMerkleTreeLatencyNS(), () -> {
+        checksumInfo.writeTo(tmpOutputStream);
+        Files.move(tmpChecksumFile.toPath(), checksumFile.toPath(), 
ATOMIC_MOVE);
+      });
     } catch (IOException ex) {
+      // If the move failed and left behind the tmp file, the tmp file will be 
overwritten on the next successful write.
+      // Nothing reads directly from the tmp file.
       metrics.incrementMerkleTreeWriteFailures();
       throw new IOException("Error occurred when writing container merkle tree 
for containerID "
           + data.getContainerID(), ex);
-    } finally {
-      writeLock.unlock();
     }
   }
 
-  public ByteString getContainerChecksumInfo(KeyValueContainerData data)
-      throws IOException {
-    long containerID = data.getContainerID();
-    Lock readLock = getReadLock(containerID);
-    readLock.lock();
-    try {
-      File checksumFile = getContainerChecksumFile(data);
-
-      try (FileInputStream inStream = new FileInputStream(checksumFile)) {
-        return ByteString.readFrom(inStream);
-      } catch (FileNotFoundException ex) {
-        // TODO: Build the container checksum tree when it doesn't exist.
-        LOG.debug("No checksum file currently exists for container {} at the 
path {}. Returning an empty instance.",
-            containerID, checksumFile, ex);
-      } catch (IOException ex) {
-        throw new IOException("Error occured when reading checksum file for 
container " + containerID +
-            " at the path " + checksumFile, ex);
-      }
-      return ByteString.EMPTY;
-    } finally {
-      readLock.unlock();
+  /**
+   * Reads the container checksum info file from the disk as bytes.
+   * Callers are not required to hold a lock while calling this since writes 
are done to a tmp file and atomically
+   * swapped into place.
+   *
+   * @throws FileNotFoundException When the file does not exist. It may not 
have been generated yet for this container.
+   * @throws IOException On error reading the file.
+   */
+  public ByteString getContainerChecksumInfo(KeyValueContainerData data) 
throws IOException {
+    File checksumFile = getContainerChecksumFile(data);
+    try (FileInputStream inStream = new FileInputStream(checksumFile)) {
+      return ByteString.readFrom(inStream);
     }
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
index bdf75763e0..969add4a15 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/DNContainerOperationClient.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
@@ -40,6 +41,7 @@ import jakarta.annotation.Nonnull;
 import org.apache.hadoop.ozone.container.common.helpers.TokenHelper;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,16 +91,33 @@ public class DNContainerOperationClient implements 
AutoCloseable {
     return xceiverClientManager;
   }
 
-  public ByteString getContainerMerkleTree(long containerId, DatanodeDetails 
dn)
+  /**
+   * Reads {@link ContainerProtos.ContainerChecksumInfo} for a specified 
container for the specified datanode.
+   *
+   * @throws IOException For errors communicating with the datanode.
+   * @throws StorageContainerException For errors obtaining the checksum info, 
including the file being missing or
+   * empty on the datanode, or the datanode not having a replica of the 
container.
+   * @throws InvalidProtocolBufferException If the file received from the 
datanode cannot be deserialized.
+   */
+  public ContainerProtos.ContainerChecksumInfo getContainerChecksumInfo(long 
containerId, DatanodeDetails dn)
       throws IOException {
     XceiverClientSpi xceiverClient = 
this.xceiverClientManager.acquireClient(createSingleNodePipeline(dn));
     try {
       String containerToken = encode(tokenHelper.getContainerToken(
           ContainerID.valueOf(containerId)));
-      ContainerProtos.GetContainerMerkleTreeResponseProto response =
-          ContainerProtocolCalls.getContainerMerkleTree(xceiverClient,
+      ContainerProtos.GetContainerChecksumInfoResponseProto response =
+          ContainerProtocolCalls.getContainerChecksumInfo(xceiverClient,
               containerId, containerToken);
-      return response.getContainerMerkleTree();
+      ByteString serializedChecksumInfo = response.getContainerChecksumInfo();
+      // Protobuf will convert an empty ByteString into a default value 
object. Treat this as an error instead, since
+      // the default value will not represent the state of the container.
+      // The server does not deserialize the file before sending it, so we 
must check the length on the client.
+      if (serializedChecksumInfo.isEmpty()) {
+        throw new StorageContainerException("Empty Container checksum file for 
container " + containerId + " received",
+            ContainerProtos.Result.IO_EXCEPTION);
+      } else {
+        return 
ContainerProtos.ContainerChecksumInfo.parseFrom(serializedChecksumInfo);
+      }
     } finally {
       this.xceiverClientManager.releaseClient(xceiverClient, false);
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 40e5bfaba6..21fa76a7be 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -832,7 +832,7 @@ public class HddsDispatcher implements ContainerDispatcher, 
Auditor {
     case StreamInit       : return DNAction.STREAM_INIT;
     case FinalizeBlock    : return DNAction.FINALIZE_BLOCK;
     case Echo             : return DNAction.ECHO;
-    case GetContainerMerkleTree : return DNAction.GET_CONTAINER_MERKLE_TREE;
+    case GetContainerChecksumInfo: return DNAction.GET_CONTAINER_CHECKSUM_INFO;
     default :
       LOG.debug("Invalid command type - {}", cmdType);
       return null;
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 2559d29da6..00d00caa18 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -99,6 +99,10 @@ import 
org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
@@ -109,6 +113,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
 import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
@@ -298,8 +303,8 @@ public class KeyValueHandler extends Handler {
       return handler.handleFinalizeBlock(request, kvContainer);
     case Echo:
       return handler.handleEcho(request, kvContainer);
-    case GetContainerMerkleTree:
-      return handler.handleGetContainerMerkleTree(request, kvContainer);
+    case GetContainerChecksumInfo:
+      return handler.handleGetContainerChecksumInfo(request, kvContainer);
     default:
       return null;
     }
@@ -671,10 +676,10 @@ public class KeyValueHandler extends Handler {
     return getEchoResponse(request);
   }
 
-  ContainerCommandResponseProto handleGetContainerMerkleTree(
+  ContainerCommandResponseProto handleGetContainerChecksumInfo(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
 
-    if (!request.hasGetContainerMerkleTree()) {
+    if (!request.hasGetContainerChecksumInfo()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Malformed Read Container Merkle tree request. trace ID: {}",
             request.getTraceID());
@@ -682,11 +687,25 @@ public class KeyValueHandler extends Handler {
       return malformedRequest(request);
     }
 
+    // A container must have moved past the CLOSING
     KeyValueContainerData containerData = kvContainer.getContainerData();
+    State state = containerData.getState();
+    boolean stateSupportsChecksumInfo = (state == CLOSED || state == 
QUASI_CLOSED || state == UNHEALTHY);
+    if (!stateSupportsChecksumInfo) {
+      return ContainerCommandResponseProto.newBuilder()
+          .setCmdType(request.getCmdType())
+          .setTraceID(request.getTraceID())
+          .setResult(UNCLOSED_CONTAINER_IO)
+          .setMessage("Checksum information is not available for containers in 
state " + state)
+          .build();
+    }
+
     ByteString checksumTree = null;
     try {
       checksumTree = checksumManager.getContainerChecksumInfo(containerData);
     } catch (IOException ex) {
+      // For file not found or other inability to read the file, return an 
error to the client.
+      LOG.error("Error occurred when reading checksum file for container {}", 
containerData.getContainerID(), ex);
       return ContainerCommandResponseProto.newBuilder()
           .setCmdType(request.getCmdType())
           .setTraceID(request.getTraceID())
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
index 2b32bb2fe9..0301304db7 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeTestUtils.java
@@ -123,6 +123,29 @@ public final class ContainerMerkleTreeTestUtils {
     }
   }
 
+  /**
+   * Builds a {@link ContainerMerkleTree} representing arbitrary data. This 
can be used to test that the same
+   * structure is preserved throughout serialization, deserialization, and API 
calls.
+   */
+  public static ContainerMerkleTree buildTestTree(ConfigurationSource conf) {
+    final long blockID1 = 1;
+    final long blockID2 = 2;
+    final long blockID3 = 3;
+    ContainerProtos.ChunkInfo b1c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{1, 2, 3}));
+    ContainerProtos.ChunkInfo b1c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{4, 5, 6}));
+    ContainerProtos.ChunkInfo b2c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{7, 8, 9}));
+    ContainerProtos.ChunkInfo b2c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{12, 11, 10}));
+    ContainerProtos.ChunkInfo b3c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{13, 14, 15}));
+    ContainerProtos.ChunkInfo b3c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{16, 17, 18}));
+
+    ContainerMerkleTree tree = new ContainerMerkleTree();
+    tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
+    tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
+    tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+
+    return tree;
+  }
+
   /**
    * This function checks whether the container checksum file exists.
    */
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
index 6b5fe7bbe1..b482d746ef 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
@@ -23,24 +23,33 @@ import 
org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch;
-import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildChunk;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree;
 import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.readChecksumFile;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 class TestContainerChecksumTreeManager {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestContainerChecksumTreeManager.class);
+
   private static final long CONTAINER_ID = 1L;
   @TempDir
   private File testDir;
@@ -97,7 +106,7 @@ class TestContainerChecksumTreeManager {
   public void testWriteOnlyTreeToFile() throws Exception {
     
assertEquals(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
     assertEquals(metrics.getCreateMerkleTreeLatencyNS().lastStat().total(), 0);
-    ContainerMerkleTree tree = buildTestTree();
+    ContainerMerkleTree tree = buildTestTree(config);
     checksumManager.writeContainerDataTree(container, tree);
     
assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 
0);
     ContainerProtos.ContainerChecksumInfo checksumInfo = 
readChecksumFile(container);
@@ -160,7 +169,7 @@ class TestContainerChecksumTreeManager {
     List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
     checksumManager.markBlocksAsDeleted(container, new 
ArrayList<>(expectedBlocksToDelete));
     
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
-    ContainerMerkleTree tree = buildTestTree();
+    ContainerMerkleTree tree = buildTestTree(config);
     checksumManager.writeContainerDataTree(container, tree);
     
assertTrue(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total() > 
0);
     
assertTrue(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total() > 
0);
@@ -178,7 +187,7 @@ class TestContainerChecksumTreeManager {
     
assertEquals(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
     assertEquals(metrics.getCreateMerkleTreeLatencyNS().lastStat().total(), 0);
     
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
-    ContainerMerkleTree tree = buildTestTree();
+    ContainerMerkleTree tree = buildTestTree(config);
     checksumManager.writeContainerDataTree(container, tree);
     
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
     List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
@@ -198,7 +207,7 @@ class TestContainerChecksumTreeManager {
   public void testReadContainerMerkleTreeMetric() throws Exception {
     
assertEquals(metrics.getWriteContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
     
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
-    ContainerMerkleTree tree = buildTestTree();
+    ContainerMerkleTree tree = buildTestTree(config);
     checksumManager.writeContainerDataTree(container, tree);
     
assertEquals(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total(), 
0);
     checksumManager.writeContainerDataTree(container, tree);
@@ -206,28 +215,93 @@ class TestContainerChecksumTreeManager {
     
assertTrue(metrics.getReadContainerMerkleTreeLatencyNS().lastStat().total() > 
0);
   }
 
+  /**
+   * Updates to the container checksum file are written to a tmp file and then 
swapped in to place. Test that when
+   * the write to the tmp file fails, the main file that is read from is left 
intact.
+   */
+  @Test
+  public void testTmpFileWriteFailure() throws Exception {
+    File tmpFile = 
ContainerChecksumTreeManager.getTmpContainerChecksumFile(container);
+    File finalFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container);
+
+    assertFalse(tmpFile.exists());
+    assertFalse(finalFile.exists());
+    ContainerMerkleTree tree = buildTestTree(config);
+    checksumManager.writeContainerDataTree(container, tree);
+    assertFalse(tmpFile.exists());
+    assertTrue(finalFile.exists());
+
+    // Make the write to the tmp file fail by removing permissions on its 
parent.
+    assertTrue(tmpFile.getParentFile().setWritable(false));
+    try {
+      checksumManager.writeContainerDataTree(container, tree);
+      fail("Write to the tmp file should have failed.");
+    } catch (IOException ex) {
+      LOG.info("Write to the tmp file failed as expected with the following 
exception: ", ex);
+    }
+    assertFalse(tmpFile.exists());
+    // The original file should still remain valid.
+    assertTrue(finalFile.exists());
+    assertTreesSortedAndMatch(tree.toProto(), 
readChecksumFile(container).getContainerMerkleTree());
+  }
+
+  @Test
+  public void testCorruptedFile() throws Exception {
+    // Write file
+    File finalFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container);
+    assertFalse(finalFile.exists());
+    ContainerMerkleTree tree = buildTestTree(config);
+    checksumManager.writeContainerDataTree(container, tree);
+    assertTrue(finalFile.exists());
+
+    // Corrupt the file so it is not a valid protobuf.
+    Files.write(finalFile.toPath(), new byte[]{1, 2, 3},
+        StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+
+    // Direct read should throw to verify the proto is not valid.
+    assertThrows(IOException.class, () -> readChecksumFile(container));
+
+    // The manager's read/modify/write cycle should account for the corruption 
and overwrite the entry.
+    // No exception should be thrown.
+    checksumManager.writeContainerDataTree(container, tree);
+    assertTreesSortedAndMatch(tree.toProto(), 
readChecksumFile(container).getContainerMerkleTree());
+  }
+
+  /**
+   * An empty file will be interpreted by protobuf to be an object with 
default values.
+   * The checksum manager should overwrite this if it is encountered.
+   */
+  @Test
+  public void testEmptyFile() throws Exception {
+    // Write file
+    File finalFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container);
+    assertFalse(finalFile.exists());
+    ContainerMerkleTree tree = buildTestTree(config);
+    checksumManager.writeContainerDataTree(container, tree);
+    assertTrue(finalFile.exists());
+
+    // Truncate the file to zero length.
+    Files.write(finalFile.toPath(), new byte[0],
+        StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+    assertEquals(0, finalFile.length());
+
+    // The truncated file will be interpreted as an empty protobuf object.
+    // Use a test helper method to read it directly and confirm this.
+    ContainerProtos.ContainerChecksumInfo emptyInfo = 
readChecksumFile(container);
+    assertFalse(emptyInfo.hasContainerID());
+    assertFalse(emptyInfo.hasContainerMerkleTree());
+
+    // The manager's read/modify/write cycle should account for the empty file 
and overwrite it with a valid entry.
+    // No exception should be thrown.
+    checksumManager.writeContainerDataTree(container, tree);
+    ContainerProtos.ContainerChecksumInfo info = readChecksumFile(container);
+    assertTreesSortedAndMatch(tree.toProto(), info.getContainerMerkleTree());
+    assertEquals(CONTAINER_ID, info.getContainerID());
+  }
+
   @Test
   public void testChecksumTreeFilePath() {
     assertEquals(checksumFile.getAbsolutePath(),
         
ContainerChecksumTreeManager.getContainerChecksumFile(container).getAbsolutePath());
   }
-
-  private ContainerMerkleTree buildTestTree() {
-    final long blockID1 = 1;
-    final long blockID2 = 2;
-    final long blockID3 = 3;
-    ContainerProtos.ChunkInfo b1c1 = buildChunk(config, 0, ByteBuffer.wrap(new 
byte[]{1, 2, 3}));
-    ContainerProtos.ChunkInfo b1c2 = buildChunk(config, 1, ByteBuffer.wrap(new 
byte[]{4, 5, 6}));
-    ContainerProtos.ChunkInfo b2c1 = buildChunk(config, 0, ByteBuffer.wrap(new 
byte[]{7, 8, 9}));
-    ContainerProtos.ChunkInfo b2c2 = buildChunk(config, 1, ByteBuffer.wrap(new 
byte[]{12, 11, 10}));
-    ContainerProtos.ChunkInfo b3c1 = buildChunk(config, 0, ByteBuffer.wrap(new 
byte[]{13, 14, 15}));
-    ContainerProtos.ChunkInfo b3c2 = buildChunk(config, 1, ByteBuffer.wrap(new 
byte[]{16, 17, 18}));
-
-    ContainerMerkleTree tree = new ContainerMerkleTree();
-    tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
-    tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
-    tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
-
-    return tree;
-  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index e13ba38181..30a8a9bcbc 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -22,9 +22,11 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,6 +40,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
 import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
@@ -62,6 +65,9 @@ import org.apache.ozone.test.GenericTestUtils;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.QUASI_CLOSED;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static org.apache.hadoop.ozone.OzoneConsts.GB;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -485,6 +491,34 @@ public class TestKeyValueHandler {
     Assertions.assertEquals(1, icrCount.get());
   }
 
+  @Test
+  public void testGetContainerChecksumInfoOnInvalidContainerStates() {
+    when(handler.handleGetContainerChecksumInfo(any(), 
any())).thenCallRealMethod();
+
+    // Only mock what is necessary for the request to fail. This test does not 
cover allowed states.
+    KeyValueContainer container = mock(KeyValueContainer.class);
+    KeyValueContainerData containerData = mock(KeyValueContainerData.class);
+    when(container.getContainerData()).thenReturn(containerData);
+
+    ContainerCommandRequestProto request = 
mock(ContainerCommandRequestProto.class);
+    when(request.hasGetContainerChecksumInfo()).thenReturn(true);
+    
when(request.getCmdType()).thenReturn(ContainerProtos.Type.GetContainerChecksumInfo);
+    when(request.getTraceID()).thenReturn("123");
+
+    Set<State> disallowedStates = EnumSet.allOf(State.class);
+    disallowedStates.removeAll(EnumSet.of(CLOSED, QUASI_CLOSED, UNHEALTHY));
+
+    for (State state: disallowedStates) {
+      when(containerData.getState()).thenReturn(state);
+      ContainerProtos.ContainerCommandResponseProto response = 
handler.handleGetContainerChecksumInfo(request,
+          container);
+      assertNotNull(response);
+      assertEquals(ContainerProtos.Result.UNCLOSED_CONTAINER_IO, 
response.getResult());
+      assertTrue(response.getMessage().contains(state.toString()), "Response 
message did not contain the container " +
+          "state " + state);
+    }
+  }
+
   private static ContainerCommandRequestProto createContainerRequest(
       String datanodeId, long containerID) {
     return ContainerCommandRequestProto.newBuilder()
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 4533a43403..66af02c83a 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -107,7 +107,7 @@ enum Type {
   StreamWrite = 20;
   FinalizeBlock = 21;
   Echo = 22;
-  GetContainerMerkleTree = 23;
+  GetContainerChecksumInfo = 23;
 }
 
 
@@ -216,7 +216,7 @@ message ContainerCommandRequestProto {
   optional   uint32 version = 24;
   optional   FinalizeBlockRequestProto finalizeBlock = 25;
   optional   EchoRequestProto echo = 26;
-  optional   GetContainerMerkleTreeRequestProto getContainerMerkleTree = 27;
+  optional   GetContainerChecksumInfoRequestProto getContainerChecksumInfo = 
27;
 }
 
 message ContainerCommandResponseProto {
@@ -248,7 +248,7 @@ message ContainerCommandResponseProto {
   optional   GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
   optional   FinalizeBlockResponseProto finalizeBlock = 22;
   optional   EchoResponseProto echo = 23;
-  optional   GetContainerMerkleTreeResponseProto getContainerMerkleTree = 24;
+  optional   GetContainerChecksumInfoResponseProto getContainerChecksumInfo = 
24;
 }
 
 message ContainerDataProto {
@@ -403,13 +403,13 @@ message  EchoResponseProto {
   optional bytes payload = 1;
 }
 
-message GetContainerMerkleTreeRequestProto {
+message GetContainerChecksumInfoRequestProto {
   optional int64 containerID = 1;
 }
 
-message GetContainerMerkleTreeResponseProto {
+message GetContainerChecksumInfoResponseProto {
   optional int64 containerID = 1;
-  optional bytes containerMerkleTree = 2;
+  optional bytes containerChecksumInfo = 2;
 }
 // Chunk Operations
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index e0bbe32abb..0d0bd3d83e 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -249,7 +249,7 @@ public class TestOzoneContainerWithTLS {
              new DNContainerOperationClient(conf, caClient, keyClient)) {
       client = clientManager.acquireClient(pipeline);
       long containerId = createAndCloseContainer(client, 
containerTokenEnabled);
-      dnClient.getContainerMerkleTree(containerId, dn);
+      dnClient.getContainerChecksumInfo(containerId, dn);
     } finally {
       if (container != null) {
         container.stop();
@@ -275,16 +275,16 @@ public class TestOzoneContainerWithTLS {
       TokenHelper tokenHelper = new TokenHelper(new SecurityConfig(conf), 
keyClient);
       String containerToken = encode(tokenHelper.getContainerToken(
           ContainerID.valueOf(containerId)));
-      ContainerProtos.GetContainerMerkleTreeResponseProto response =
-          ContainerProtocolCalls.getContainerMerkleTree(client,
+      ContainerProtos.GetContainerChecksumInfoResponseProto response =
+          ContainerProtocolCalls.getContainerChecksumInfo(client,
               containerId, containerToken);
       // Getting container merkle tree with valid container token
-      assertFalse(response.getContainerMerkleTree().isEmpty());
+      assertFalse(response.getContainerChecksumInfo().isEmpty());
 
       // Getting container merkle tree with invalid container token
       XceiverClientSpi finalClient = client;
       StorageContainerException exception = 
assertThrows(StorageContainerException.class,
-          () -> ContainerProtocolCalls.getContainerMerkleTree(
+          () -> ContainerProtocolCalls.getContainerChecksumInfo(
           finalClient, containerId, "invalidContainerToken"));
       assertEquals(ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED, 
exception.getResult());
     } finally {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
index bc235292d2..64af0148a8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/checksum/TestContainerCommandReconciliation.java
@@ -18,14 +18,14 @@
 
 package org.apache.hadoop.ozone.dn.checksum;
 
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -34,41 +34,43 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
 import org.apache.hadoop.ozone.container.checksum.ContainerMerkleTree;
 import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
 import org.apache.ozone.test.GenericTestUtils;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
 import java.util.List;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
-import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
 import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.assertTreesSortedAndMatch;
-import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildChunk;
+import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.buildTestTree;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * This class tests container commands for reconciliation.
  */
 public class TestContainerCommandReconciliation {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestContainerCommandReconciliation.class);
   private static MiniOzoneCluster cluster;
   private static OzoneClient rpcClient;
   private static ObjectStore store;
@@ -83,11 +85,10 @@ public class TestContainerCommandReconciliation {
     testDir = GenericTestUtils.getTestDir(
         TestContainerCommandReconciliation.class.getSimpleName());
     conf = new OzoneConfiguration();
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
     conf.set(OZONE_METADATA_DIRS, testDir.getAbsolutePath());
-    conf.getBoolean(OZONE_SECURITY_ENABLED_KEY,
-        OZONE_SECURITY_ENABLED_DEFAULT);
+    // Disable the container scanner so it does not create merkle tree files 
that interfere with this test.
+    conf.getObject(ContainerScannerConfiguration.class).setEnabled(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
         .build();
@@ -108,42 +109,172 @@ public class TestContainerCommandReconciliation {
     }
   }
 
+  /**
+   * Container checksum trees are only generated for non-open containers.
+   * Calling the API on a non-open container should fail.
+   */
   @Test
-  public void testGetContainerMerkleTree() throws Exception {
-    final String volumeName = UUID.randomUUID().toString();
-    final String bucketName = UUID.randomUUID().toString();
-    final String keyName = UUID.randomUUID().toString();
-    byte[] data = "Test content".getBytes(UTF_8);
-    store.createVolume(volumeName);
-    OzoneVolume volume = store.getVolume(volumeName);
-    volume.createBucket(bucketName);
-    OzoneBucket bucket = volume.getBucket(bucketName);
-    // Write Key
-    try (OzoneOutputStream os = bucket.createKey(keyName, data.length)) {
-      IOUtils.write(data, os);
+  public void testGetChecksumInfoOpenReplica() throws Exception {
+    long containerID = writeDataAndGetContainer(false);
+    HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+    StorageContainerException ex = 
assertThrows(StorageContainerException.class,
+        () -> dnClient.getContainerChecksumInfo(containerID, 
targetDN.getDatanodeDetails()));
+    assertEquals(ex.getResult(), ContainerProtos.Result.UNCLOSED_CONTAINER_IO);
+  }
+
+  /**
+   * Tests reading the container checksum info file from a datanode who does 
not have a replica for the requested
+   * container.
+   */
+  @Test
+  public void testGetChecksumInfoNonexistentReplica() {
+    HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+
+    // Find a container ID that does not exist in the cluster. For a small 
test this should be a good starting
+    // point, but modify it just in case.
+    long badIDCheck = 1_000_000;
+    while (cluster.getStorageContainerManager().getContainerManager()
+        .containerExist(ContainerID.valueOf(badIDCheck))) {
+      badIDCheck++;
     }
 
-    // Close container
-    ContainerInfo container = cluster.getStorageContainerManager()
-        .getContainerManager().getContainers().get(0);
-    closeContainer(container);
+    final long nonexistentContainerID = badIDCheck;
+    StorageContainerException ex = 
assertThrows(StorageContainerException.class,
+        () -> dnClient.getContainerChecksumInfo(nonexistentContainerID, 
targetDN.getDatanodeDetails()));
+    assertEquals(ex.getResult(), ContainerProtos.Result.CONTAINER_NOT_FOUND);
+  }
+
+  /**
+   * Tests reading the container checksum info file from a datanode where the 
container exists, but the file has not
+   * yet been created.
+   */
+  @Test
+  public void testGetChecksumInfoNonexistentFile() throws Exception {
+    long containerID = writeDataAndGetContainer(true);
+    // Pick a datanode and remove its checksum file.
+    HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+    Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+        .getContainerSet().getContainer(containerID);
+    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    // Closing the container should have generated the tree file.
+    assertTrue(treeFile.exists());
+    assertTrue(treeFile.delete());
+
+    StorageContainerException ex = 
assertThrows(StorageContainerException.class, () ->
+        dnClient.getContainerChecksumInfo(containerID, 
targetDN.getDatanodeDetails()));
+    assertEquals(ContainerProtos.Result.IO_EXCEPTION, ex.getResult());
+    assertTrue(ex.getMessage().contains("(No such file or directory"), 
ex.getMessage() +
+        " did not contain the expected string");
+  }
+
+  /**
+   * Tests reading the container checksum info file from a datanode where the 
datanode fails to read the file from
+   * the disk.
+   */
+  @Test
+  public void testGetChecksumInfoServerIOError() throws Exception {
+    long containerID = writeDataAndGetContainer(true);
+    // Pick a datanode and remove its checksum file.
+    HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+    Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+        .getContainerSet().getContainer(containerID);
+    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    assertTrue(treeFile.exists());
+    // Make the server unable to read the file.
+    assertTrue(treeFile.setReadable(false));
+
+    StorageContainerException ex = 
assertThrows(StorageContainerException.class, () ->
+        dnClient.getContainerChecksumInfo(containerID, 
targetDN.getDatanodeDetails()));
+    assertEquals(ContainerProtos.Result.IO_EXCEPTION, ex.getResult());
+  }
 
-    //Write Checksum Data
-    ContainerMerkleTree tree = buildTestTree();
-    writeChecksumFileToDatanode(container, tree);
+  /**
+   * Tests reading the container checksum info file from a datanode where the 
file is corrupt.
+   * The datanode does not deserialize the file before sending it, so there 
should be no error on the server side
+   * when sending the file. The client should raise an error trying to 
deserialize it.
+   */
+  @Test
+  public void testGetCorruptChecksumInfo() throws Exception {
+    long containerID = writeDataAndGetContainer(true);
+
+    // Pick a datanode and corrupt its checksum file.
+    HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+    Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+        .getContainerSet().getContainer(containerID);
+    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    Files.write(treeFile.toPath(), new byte[]{1, 2, 3},
+        StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+
+    // Reading the file from the replica should fail when the client tries to 
deserialize it.
+    assertThrows(InvalidProtocolBufferException.class, () -> 
dnClient.getContainerChecksumInfo(containerID,
+        targetDN.getDatanodeDetails()));
+  }
+
+  @Test
+  public void testGetEmptyChecksumInfo() throws Exception {
+    long containerID = writeDataAndGetContainer(true);
 
-    // Verify all the ContainerMerkle Tree matches
+    // Pick a datanode and truncate its checksum file to zero length.
+    HddsDatanodeService targetDN = cluster.getHddsDatanodes().get(0);
+    Container<?> container = targetDN.getDatanodeStateMachine().getContainer()
+        .getContainerSet().getContainer(containerID);
+    File treeFile = 
ContainerChecksumTreeManager.getContainerChecksumFile(container.getContainerData());
+    // TODO After HDDS-10379 the file will already exist and need to be 
overwritten.
+    assertTrue(treeFile.exists());
+    Files.write(treeFile.toPath(), new byte[]{},
+        StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+    assertEquals(0, treeFile.length());
+
+    // The client will get an empty byte string back. It should raise this as 
an error instead of returning a default
+    // protobuf object.
+    StorageContainerException ex = 
assertThrows(StorageContainerException.class, () ->
+        dnClient.getContainerChecksumInfo(containerID, 
targetDN.getDatanodeDetails()));
+    assertEquals(ContainerProtos.Result.IO_EXCEPTION, ex.getResult());
+  }
+
+  @Test
+  public void testGetChecksumInfoSuccess() throws Exception {
+    long containerID = writeDataAndGetContainer(true);
+    // Overwrite the existing tree with a custom one for testing. We will 
check that it is returned properly from the
+    // API.
+    ContainerMerkleTree tree = buildTestTree(conf);
+    writeChecksumFileToDatanodes(containerID, tree);
+
+    // Verify trees match on all replicas.
+    // This test is expecting Ratis 3 data written on a 3 node cluster, so 
every node has a replica.
+    assertEquals(3, cluster.getHddsDatanodes().size());
     List<DatanodeDetails> datanodeDetails = cluster.getHddsDatanodes().stream()
         
.map(HddsDatanodeService::getDatanodeDetails).collect(Collectors.toList());
     for (DatanodeDetails dn: datanodeDetails) {
-      ByteString merkleTree = 
dnClient.getContainerMerkleTree(container.getContainerID(), dn);
       ContainerProtos.ContainerChecksumInfo containerChecksumInfo =
-          ContainerProtos.ContainerChecksumInfo.parseFrom(merkleTree);
+          dnClient.getContainerChecksumInfo(containerID, dn);
       assertTreesSortedAndMatch(tree.toProto(), 
containerChecksumInfo.getContainerMerkleTree());
     }
   }
 
-  public static void writeChecksumFileToDatanode(ContainerInfo container, 
ContainerMerkleTree tree) throws Exception {
+  private long writeDataAndGetContainer(boolean close) throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName);
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    byte[] data = "Test content".getBytes(UTF_8);
+    // Write Key
+    try (OzoneOutputStream os = TestHelper.createKey("testkey", RATIS, THREE, 
0, store, volumeName, bucketName)) {
+      IOUtils.write(data, os);
+    }
+
+    long containerID = bucket.getKey("testkey").getOzoneKeyLocations().stream()
+        .findFirst().get().getContainerID();
+    if (close) {
+      TestHelper.waitForContainerClose(cluster, containerID);
+    }
+    return containerID;
+  }
+
+  public static void writeChecksumFileToDatanodes(long containerID, 
ContainerMerkleTree tree) throws Exception {
     // Write Container Merkle Tree
     for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
       KeyValueHandler keyValueHandler =
@@ -151,46 +282,9 @@ public class TestContainerCommandReconciliation {
               .getHandler(ContainerProtos.ContainerType.KeyValueContainer);
       KeyValueContainer keyValueContainer =
           (KeyValueContainer) 
dn.getDatanodeStateMachine().getContainer().getController()
-              .getContainer(container.getContainerID());
+              .getContainer(containerID);
       keyValueHandler.getChecksumManager().writeContainerDataTree(
           keyValueContainer.getContainerData(), tree);
     }
   }
-
-  public static void closeContainer(ContainerInfo container) throws Exception {
-    //Close the container first.
-    
cluster.getStorageContainerManager().getClientProtocolServer().closeContainer(container.getContainerID());
-    GenericTestUtils.waitFor(() -> checkContainerState(container), 100, 50000);
-  }
-
-  private static boolean checkContainerState(ContainerInfo container) {
-    ContainerInfo containerInfo =  null;
-    try {
-      containerInfo = cluster.getStorageContainerManager()
-          .getContainerInfo(container.getContainerID());
-      return containerInfo.getState() == HddsProtos.LifeCycleState.CLOSED;
-    } catch (IOException e) {
-      LOG.error("Error when getting the container info", e);
-    }
-    return false;
-  }
-
-  public static ContainerMerkleTree buildTestTree() {
-    final long blockID1 = 1;
-    final long blockID2 = 2;
-    final long blockID3 = 3;
-    ContainerProtos.ChunkInfo b1c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{1, 2, 3}));
-    ContainerProtos.ChunkInfo b1c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{4, 5, 6}));
-    ContainerProtos.ChunkInfo b2c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{7, 8, 9}));
-    ContainerProtos.ChunkInfo b2c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{12, 11, 10}));
-    ContainerProtos.ChunkInfo b3c1 = buildChunk(conf, 0, ByteBuffer.wrap(new 
byte[]{13, 14, 15}));
-    ContainerProtos.ChunkInfo b3c2 = buildChunk(conf, 1, ByteBuffer.wrap(new 
byte[]{16, 17, 18}));
-
-    ContainerMerkleTree tree = new ContainerMerkleTree();
-    tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
-    tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
-    tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
-
-    return tree;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to