This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to 
refs/heads/HDDS-10239-container-reconciliation by this push:
     new d585363a7a HDDS-10887. Implement a basic Merkle Tree Manager. (#6778)
d585363a7a is described below

commit d585363a7ab509f4c2b5428dc6046d2856282eac
Author: Ethan Rose <[email protected]>
AuthorDate: Fri Jun 28 15:46:22 2024 -0400

    HDDS-10887. Implement a basic Merkle Tree Manager. (#6778)
---
 .../checksum/ContainerChecksumTreeManager.java     | 171 ++++++++++++
 .../container/checksum/ContainerMerkleTree.java    | 180 +++++++++++++
 .../ozone/container/checksum/package-info.java     |  21 ++
 .../common/statemachine/DatanodeConfiguration.java |  27 ++
 .../checksum/TestContainerChecksumTreeManager.java | 169 ++++++++++++
 .../checksum/TestContainerMerkleTree.java          | 293 +++++++++++++++++++++
 .../src/main/proto/DatanodeClientProtocol.proto    |  24 ++
 7 files changed, 885 insertions(+)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
new file mode 100644
index 0000000000..939c6d08b3
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerChecksumTreeManager.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
+
+import com.google.common.util.concurrent.Striped;
+import org.apache.hadoop.hdds.utils.SimpleStriped;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class coordinates reading and writing Container checksum information 
for all containers.
+ */
+public class ContainerChecksumTreeManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ContainerChecksumTreeManager.class);
+
+  // Used to coordinate reads and writes to each container's checksum file.
+  // Each container ID is mapped to a stripe.
+  private final Striped<ReadWriteLock> fileLock;
+
+  /**
+   * Creates one instance that should be used to coordinate all container 
checksum info within a datanode.
+   */
+  public ContainerChecksumTreeManager(DatanodeConfiguration dnConf) {
+    fileLock = 
SimpleStriped.readWriteLock(dnConf.getContainerChecksumLockStripes(), true);
+  }
+
+  /**
+   * Writes the specified container merkle tree to the specified container's 
checksum file.
+   * The data merkle tree within the file is replaced with the {@code tree} 
parameter, but all other content of the
+   * file remains unchanged.
+   * Concurrent writes to the same file are coordinated internally.
+   */
+  public void writeContainerDataTree(KeyValueContainerData data, 
ContainerMerkleTree tree) throws IOException {
+    Lock writeLock = getWriteLock(data.getContainerID());
+    writeLock.lock();
+    try {
+      ContainerProtos.ContainerChecksumInfo newChecksumInfo = 
read(data).toBuilder()
+          .setContainerMerkleTree(tree.toProto())
+          .build();
+      write(data, newChecksumInfo);
+      LOG.debug("Data merkle tree for container {} updated", 
data.getContainerID());
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Adds the specified blocks to the list of deleted blocks specified in the 
container's checksum file.
+   * All other content of the file remains unchanged.
+   * Concurrent writes to the same file are coordinated internally.
+   */
+  public void markBlocksAsDeleted(KeyValueContainerData data, SortedSet<Long> 
deletedBlockIDs) throws IOException {
+    Lock writeLock = getWriteLock(data.getContainerID());
+    writeLock.lock();
+    try {
+      ContainerProtos.ContainerChecksumInfo.Builder checksumInfoBuilder = 
read(data).toBuilder();
+      // Although the persisted block list should already be sorted, we will 
sort it here to make sure.
+      // This will automatically fix any bugs in the persisted order that may 
show up.
+      SortedSet<Long> sortedDeletedBlockIDs = new 
TreeSet<>(checksumInfoBuilder.getDeletedBlocksList());
+      // Since the provided list of block IDs is already sorted, this is a 
linear time addition.
+      sortedDeletedBlockIDs.addAll(deletedBlockIDs);
+
+      checksumInfoBuilder
+          .clearDeletedBlocks()
+          .addAllDeletedBlocks(sortedDeletedBlockIDs)
+          .build();
+      write(data, checksumInfoBuilder.build());
+      LOG.debug("Deleted block list for container {} updated", 
data.getContainerID());
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public ContainerDiff diff(KeyValueContainerData thisContainer, File 
otherContainerTree)
+      throws IOException {
+    // TODO HDDS-10928 compare the checksum info of the two containers and 
return a summary.
+    //  Callers can act on this summary to repair their container replica 
using the peer's replica.
+    //  This method will use the read lock, which is unused in the current 
implementation.
+    return new ContainerDiff();
+  }
+
+  /**
+   * Returns the container checksum tree file for the specified container 
without deserializing it.
+   */
+  public File getContainerChecksumFile(KeyValueContainerData data) {
+    return new File(data.getMetadataPath(), data.getContainerID() + ".tree");
+  }
+
+  private Lock getReadLock(long containerID) {
+    return fileLock.get(containerID).readLock();
+  }
+
+  private Lock getWriteLock(long containerID) {
+    return fileLock.get(containerID).writeLock();
+  }
+
+  private ContainerProtos.ContainerChecksumInfo read(KeyValueContainerData 
data) throws IOException {
+    long containerID = data.getContainerID();
+    Lock readLock = getReadLock(containerID);
+    readLock.lock();
+    try {
+      File checksumFile = getContainerChecksumFile(data);
+      // If the checksum file has not been created yet, return an empty 
instance.
+      // Since all writes happen as part of an atomic read-modify-write cycle 
that requires a write lock, two empty
+      // instances for the same container obtained only under the read lock 
will not conflict.
+      if (!checksumFile.exists()) {
+        LOG.debug("No checksum file currently exists for container {} at the 
path {}. Returning an empty instance.",
+            containerID, checksumFile);
+        return ContainerProtos.ContainerChecksumInfo.newBuilder()
+            .setContainerID(containerID)
+            .build();
+      }
+      try (FileInputStream inStream = new FileInputStream(checksumFile)) {
+        return ContainerProtos.ContainerChecksumInfo.parseFrom(inStream);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private void write(KeyValueContainerData data, 
ContainerProtos.ContainerChecksumInfo checksumInfo)
+      throws IOException {
+    Lock writeLock = getWriteLock(data.getContainerID());
+    writeLock.lock();
+    try (FileOutputStream outStream = new 
FileOutputStream(getContainerChecksumFile(data))) {
+      checksumInfo.writeTo(outStream);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * This class represents the difference between our replica of a container 
and a peer's replica of a container.
+   * It summarizes the operations we need to do to reconcile our replica with 
the peer replica it was compared to.
+   *
+   * TODO HDDS-10928
+   */
+  public static class ContainerDiff {
+    public ContainerDiff() {
+
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTree.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTree.java
new file mode 100644
index 0000000000..9eeb50b649
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTree.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.common.ChecksumByteBuffer;
+import org.apache.hadoop.ozone.common.ChecksumByteBufferFactory;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * This class represents a Merkle tree that provides one checksum for all data 
within a container.
+ *
+ * As the leaves of the tree, a checksum for each chunk is computed by taking 
a checksum of all checksums within that
+ * chunk. Each chunk checksum in a block is further checksummed together to 
generate the block level checksum. Finally,
+ * The checksums of all blocks are checksummed together to create a container 
level checksum.
+ * Note that checksums are order dependent. Chunk checksums are sorted by their
+ * offset within a block, and block checksums are sorted by their block ID.
+ *
+ * This class can be used to construct a consistent and completely filled 
{@link ContainerProtos.ContainerMerkleTree}
+ * object. It allows building a container merkle tree from scratch by 
incrementally adding chunks.
+ * The final checksums at higher levels of the tree are not calculated until
+ * {@link ContainerMerkleTree#toProto} is called.
+ */
+public class ContainerMerkleTree {
+
+  private final SortedMap<Long, BlockMerkleTree> id2Block;
+
+  /**
+   * Constructs an empty Container merkle tree object.
+   */
+  public ContainerMerkleTree() {
+    id2Block = new TreeMap<>();
+  }
+
+  /**
+   * Adds chunks to a block in the tree. The block entry will be created if it 
is the first time adding chunks to it.
+   * If the block entry already exists, the chunks will be added to the 
existing chunks for that block.
+   *
+   * @param blockID The ID of the block that these chunks belong to.
+   * @param chunks A list of chunks to add to this block. The chunks will be 
sorted internally by their offset.
+   */
+  public void addChunks(long blockID, Collection<ChunkInfo> chunks) {
+    id2Block.computeIfAbsent(blockID, BlockMerkleTree::new).addChunks(chunks);
+  }
+
+  /**
+   * Uses chunk hashes to compute all remaining hashes in the tree, and 
returns it as a protobuf object. No checksum
+   * computation for the tree happens outside of this method.
+   *
+   * @return A complete protobuf object representation of this tree.
+   */
+  public ContainerProtos.ContainerMerkleTree toProto() {
+    // Compute checksums and return the result.
+    ContainerProtos.ContainerMerkleTree.Builder containerTreeBuilder = 
ContainerProtos.ContainerMerkleTree.newBuilder();
+    ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+    ByteBuffer containerChecksumBuffer = ByteBuffer.allocate(Long.BYTES * 
id2Block.size());
+
+    for (BlockMerkleTree blockTree: id2Block.values()) {
+      ContainerProtos.BlockMerkleTree blockTreeProto = blockTree.toProto();
+      containerTreeBuilder.addBlockMerkleTree(blockTreeProto);
+      // Add the block's checksum to the buffer that will be used to calculate 
the container checksum.
+      containerChecksumBuffer.putLong(blockTreeProto.getBlockChecksum());
+    }
+    containerChecksumBuffer.flip();
+    checksumImpl.update(containerChecksumBuffer);
+
+    return containerTreeBuilder
+        .setDataChecksum(checksumImpl.getValue())
+        .build();
+  }
+
+  /**
+   * Represents a merkle tree for a single block within a container.
+   */
+  private static class BlockMerkleTree {
+    // Map of each offset within the block to its chunk info.
+    // Chunk order in the checksum is determined by their offset.
+    private final SortedMap<Long, ChunkMerkleTree> offset2Chunk;
+    private final long blockID;
+
+    BlockMerkleTree(long blockID) {
+      this.blockID = blockID;
+      this.offset2Chunk = new TreeMap<>();
+    }
+
+    /**
+     * Adds the specified chunks to this block. The offset value of the chunk 
must be unique within the block,
+     * otherwise it will overwrite the previous value at that offset.
+     *
+     * @param chunks A list of chunks to add to this block.
+     */
+    public void addChunks(Collection<ChunkInfo> chunks) {
+      for (ChunkInfo chunk: chunks) {
+        offset2Chunk.put(chunk.getOffset(), new ChunkMerkleTree(chunk));
+      }
+    }
+
+    /**
+     * Uses chunk hashes to compute a block hash for this tree, and returns it 
as a protobuf object. All block checksum
+     * computation for the tree happens within this method.
+     *
+     * @return A complete protobuf object representation of this block tree.
+     */
+    public ContainerProtos.BlockMerkleTree toProto() {
+      ContainerProtos.BlockMerkleTree.Builder blockTreeBuilder = 
ContainerProtos.BlockMerkleTree.newBuilder();
+      ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+      ByteBuffer blockChecksumBuffer = ByteBuffer.allocate(Long.BYTES * 
offset2Chunk.size());
+
+      for (ChunkMerkleTree chunkTree: offset2Chunk.values()) {
+        // Ordering of checksums within a chunk is assumed to be in the order 
they are written.
+        // This assumption is already built in to the code that reads and 
writes the values (see
+        // ChunkInputStream#validateChunk for an example on the client read 
path).
+        // There is no other value we can use to sort these checksums, so we 
assume the stored proto has them in the
+        // correct order.
+        ContainerProtos.ChunkMerkleTree chunkTreeProto = chunkTree.toProto();
+        blockTreeBuilder.addChunkMerkleTree(chunkTreeProto);
+        blockChecksumBuffer.putLong(chunkTreeProto.getChunkChecksum());
+      }
+      blockChecksumBuffer.flip();
+      checksumImpl.update(blockChecksumBuffer);
+
+      return blockTreeBuilder
+          .setBlockID(blockID)
+          .setBlockChecksum(checksumImpl.getValue())
+          .build();
+    }
+  }
+
+  /**
+   * Represents a merkle tree for a single chunk within a container.
+   * Each chunk has multiple checksums within it at each "bytesPerChecksum" 
interval.
+   * This class computes one checksum for the whole chunk by aggregating these.
+   */
+  private static class ChunkMerkleTree {
+    private final ChunkInfo chunk;
+
+    ChunkMerkleTree(ChunkInfo chunk) {
+      this.chunk = chunk;
+    }
+
+    /**
+     * Computes a single hash for this ChunkInfo object. All chunk level 
checksum computation happens within this
+     * method.
+     *
+     * @return A complete protobuf representation of this chunk as a leaf in 
the container merkle tree.
+     */
+    public ContainerProtos.ChunkMerkleTree toProto() {
+      ChecksumByteBuffer checksumImpl = ChecksumByteBufferFactory.crc32Impl();
+      for (ByteString checksum: chunk.getChecksumData().getChecksums()) {
+        checksumImpl.update(checksum.asReadOnlyByteBuffer());
+      }
+
+      return ContainerProtos.ChunkMerkleTree.newBuilder()
+          .setOffset(chunk.getOffset())
+          .setLength(chunk.getLen())
+          .setChunkChecksum(checksumImpl.getValue())
+          .build();
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/package-info.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/package-info.java
new file mode 100644
index 0000000000..9dfdc88bf1
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+/**
+ * This package contains classes handling container level checksums.
+ */
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index a8b0d8cfa4..28bbb17aa8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -74,6 +74,7 @@ public class DatanodeConfiguration extends 
ReconfigurableConfig {
       "hdds.datanode.wait.on.all.followers";
   public static final String CONTAINER_SCHEMA_V3_ENABLED =
       "hdds.datanode.container.schema.v3.enabled";
+  public static final String CONTAINER_CHECKSUM_LOCK_STRIPES_KEY = 
"hdds.datanode.container.checksum.lock.stripes";
 
   static final boolean CHUNK_DATA_VALIDATION_CHECK_DEFAULT = false;
 
@@ -109,6 +110,7 @@ public class DatanodeConfiguration extends 
ReconfigurableConfig {
       "hdds.datanode.rocksdb.delete_obsolete_files_period";
   public static final Boolean
       OZONE_DATANODE_CHECK_EMPTY_CONTAINER_DIR_ON_DELETE_DEFAULT = false;
+  public static final int CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT = 127;
 
   /**
    * Number of threads per volume that Datanode will use for chunk read.
@@ -550,6 +552,21 @@ public class DatanodeConfiguration extends 
ReconfigurableConfig {
   private boolean bCheckEmptyContainerDir =
       OZONE_DATANODE_CHECK_EMPTY_CONTAINER_DIR_ON_DELETE_DEFAULT;
 
+  /**
+   * Whether to check container directory or not to determine
+   * container is empty.
+   */
+  @Config(key = "container.checksum.lock.stripes",
+      type = ConfigType.INT,
+      defaultValue = "127",
+      tags = { DATANODE },
+      description = "The number of lock stripes used to coordinate 
modifications to container checksum information. " +
+          "This information is only updated after a container is closed and 
does not affect the data read or write" +
+          " path. Each container in the datanode will be mapped to one lock 
which will only be held while its " +
+          "checksum information is updated."
+  )
+  private int containerChecksumLockStripes = 
CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
+
   @PostConstruct
   public void validate() {
     if (containerDeleteThreads < 1) {
@@ -683,6 +700,12 @@ public class DatanodeConfiguration extends 
ReconfigurableConfig {
       rocksdbDeleteObsoleteFilesPeriod =
           ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICRO_SECONDS_DEFAULT;
     }
+
+    if (containerChecksumLockStripes < 1) {
+      LOG.warn("{} must be at least 1. Defaulting to {}", 
CONTAINER_CHECKSUM_LOCK_STRIPES_KEY,
+          CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT);
+      containerChecksumLockStripes = CONTAINER_CHECKSUM_LOCK_STRIPES_DEFAULT;
+    }
   }
 
   public void setContainerDeleteThreads(int containerDeleteThreads) {
@@ -910,4 +933,8 @@ public class DatanodeConfiguration extends 
ReconfigurableConfig {
   public void setAutoCompactionSmallSstFileNum(int num) {
     this.autoCompactionSmallSstFileNum = num;
   }
+
+  public int getContainerChecksumLockStripes() {
+    return containerChecksumLockStripes;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
new file mode 100644
index 0000000000..767eed8a73
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerChecksumTreeManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+
+import static 
org.apache.hadoop.ozone.container.checksum.TestContainerMerkleTree.assertTreesSortedAndMatch;
+import static 
org.apache.hadoop.ozone.container.checksum.TestContainerMerkleTree.buildChunk;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestContainerChecksumTreeManager {
+
+  private static final long CONTAINER_ID = 1L;
+  @TempDir
+  private File testDir;
+  private KeyValueContainerData container;
+  private File checksumFile;
+  private ContainerChecksumTreeManager checksumManager;
+
+  @BeforeEach
+  public void init() {
+    container = mock(KeyValueContainerData.class);
+    when(container.getContainerID()).thenReturn(CONTAINER_ID);
+    when(container.getMetadataPath()).thenReturn(testDir.getAbsolutePath());
+    checksumFile = new File(testDir, CONTAINER_ID + ".tree");
+    checksumManager = new ContainerChecksumTreeManager(new 
DatanodeConfiguration());
+  }
+
+  @Test
+  public void testWriteEmptyTreeToFile() throws Exception {
+    checksumManager.writeContainerDataTree(container, new 
ContainerMerkleTree());
+    ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+    assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+    assertTrue(checksumInfo.getDeletedBlocksList().isEmpty());
+    ContainerProtos.ContainerMerkleTree treeProto = 
checksumInfo.getContainerMerkleTree();
+    assertEquals(0, treeProto.getDataChecksum());
+    assertTrue(treeProto.getBlockMerkleTreeList().isEmpty());
+  }
+
+  @Test
+  public void testWriteEmptyBlockListToFile() throws Exception {
+    checksumManager.markBlocksAsDeleted(container, new TreeSet<>());
+    ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+    assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+    assertTrue(checksumInfo.getDeletedBlocksList().isEmpty());
+    ContainerProtos.ContainerMerkleTree treeProto = 
checksumInfo.getContainerMerkleTree();
+    assertEquals(0, treeProto.getDataChecksum());
+    assertTrue(treeProto.getBlockMerkleTreeList().isEmpty());
+  }
+
+  @Test
+  public void testWriteOnlyTreeToFile() throws Exception {
+    ContainerMerkleTree tree = buildTestTree();
+    checksumManager.writeContainerDataTree(container, tree);
+
+    ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+    assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+    assertTrue(checksumInfo.getDeletedBlocksList().isEmpty());
+    // TestContainerMerkleTree verifies that going from ContainerMerkleTree to 
its proto is consistent.
+    // Therefore, we can use the proto version of our expected tree to check 
what was written to the file.
+    assertTreesSortedAndMatch(tree.toProto(), 
checksumInfo.getContainerMerkleTree());
+  }
+
+  @Test
+  public void testWriteOnlyDeletedBlocksToFile() throws Exception {
+    List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
+    checksumManager.markBlocksAsDeleted(container, new 
TreeSet<>(expectedBlocksToDelete));
+
+    ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+    assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+    assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
+    ContainerProtos.ContainerMerkleTree treeProto = 
checksumInfo.getContainerMerkleTree();
+    assertEquals(0, treeProto.getDataChecksum());
+    assertTrue(treeProto.getBlockMerkleTreeList().isEmpty());
+  }
+
+  @Test
+  public void testDeletedBlocksPreservedOnTreeWrite() throws Exception {
+    List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
+    checksumManager.markBlocksAsDeleted(container, new 
TreeSet<>(expectedBlocksToDelete));
+    ContainerMerkleTree tree = buildTestTree();
+    checksumManager.writeContainerDataTree(container, tree);
+
+    ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+    assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+    assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
+    assertTreesSortedAndMatch(tree.toProto(), 
checksumInfo.getContainerMerkleTree());
+  }
+
+  @Test
+  public void testTreePreservedOnDeletedBlocksWrite() throws Exception {
+    ContainerMerkleTree tree = buildTestTree();
+    checksumManager.writeContainerDataTree(container, tree);
+    List<Long> expectedBlocksToDelete = Arrays.asList(1L, 2L, 3L);
+    checksumManager.markBlocksAsDeleted(container, new 
TreeSet<>(expectedBlocksToDelete));
+
+    ContainerProtos.ContainerChecksumInfo checksumInfo = readFile();
+
+    assertEquals(CONTAINER_ID, checksumInfo.getContainerID());
+    assertEquals(expectedBlocksToDelete, checksumInfo.getDeletedBlocksList());
+    assertTreesSortedAndMatch(tree.toProto(), 
checksumInfo.getContainerMerkleTree());
+  }
+
+  @Test
+  public void testChecksumTreeFilePath() {
+    assertEquals(checksumFile.getAbsolutePath(), 
checksumManager.getContainerChecksumFile(container).getAbsolutePath());
+  }
+
+  private ContainerMerkleTree buildTestTree() throws Exception {
+    final long blockID1 = 1;
+    final long blockID2 = 2;
+    final long blockID3 = 3;
+    ChunkInfo b1c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b1c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{4, 5, 6}));
+    ChunkInfo b2c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{7, 8, 9}));
+    ChunkInfo b2c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{12, 11, 10}));
+    ChunkInfo b3c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{13, 14, 15}));
+    ChunkInfo b3c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{16, 17, 18}));
+
+    ContainerMerkleTree tree = new ContainerMerkleTree();
+    tree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
+    tree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
+    tree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+
+    return tree;
+  }
+
+  private ContainerProtos.ContainerChecksumInfo readFile() throws IOException {
+    try (FileInputStream inStream = new FileInputStream(checksumFile)) {
+      return ContainerProtos.ContainerChecksumInfo.parseFrom(inStream);
+    }
+  }
+}
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTree.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTree.java
new file mode 100644
index 0000000000..a93c4f1702
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/checksum/TestContainerMerkleTree.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.checksum;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestContainerMerkleTree {
+  private static final long CHUNK_SIZE = (long) new 
OzoneConfiguration().getStorageSize(
+      ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, 
ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT, StorageUnit.BYTES);
+  private static final int BYTES_PER_CHECKSUM = new 
OzoneClientConfig().getBytesPerChecksum();
+
+  @Test
+  public void testBuildEmptyTree() {
+    ContainerMerkleTree tree = new ContainerMerkleTree();
+    ContainerProtos.ContainerMerkleTree treeProto = tree.toProto();
+    assertEquals(0, treeProto.getDataChecksum());
+    assertEquals(0, treeProto.getBlockMerkleTreeCount());
+  }
+
+  @Test
+  public void testBuildOneChunkTree() throws Exception {
+    // Seed the expected and actual trees with the same chunk.
+    final long blockID = 1;
+    ChunkInfo chunk = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+
+    // Build the expected tree proto using the test code.
+    ContainerProtos.ChunkMerkleTree chunkTree = buildExpectedChunkTree(chunk);
+    ContainerProtos.BlockMerkleTree blockTree = buildExpectedBlockTree(blockID,
+        Collections.singletonList(chunkTree));
+    ContainerProtos.ContainerMerkleTree expectedTree = 
buildExpectedContainerTree(Collections.singletonList(blockTree));
+
+    // Use the ContainerMerkleTree to build the same tree.
+    ContainerMerkleTree actualTree = new ContainerMerkleTree();
+    actualTree.addChunks(blockID, Collections.singletonList(chunk));
+
+    // Ensure the trees match.
+    ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+    assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+
+    // Do some manual verification of the generated tree as well.
+    assertNotEquals(0, actualTreeProto.getDataChecksum());
+    assertEquals(1, actualTreeProto.getBlockMerkleTreeCount());
+
+    ContainerProtos.BlockMerkleTree actualBlockTree = 
actualTreeProto.getBlockMerkleTree(0);
+    assertEquals(1, actualBlockTree.getBlockID());
+    assertEquals(1, actualBlockTree.getChunkMerkleTreeCount());
+    assertNotEquals(0, actualBlockTree.getBlockChecksum());
+
+    ContainerProtos.ChunkMerkleTree actualChunkTree = 
actualBlockTree.getChunkMerkleTree(0);
+    assertEquals(0, actualChunkTree.getOffset());
+    assertEquals(CHUNK_SIZE, actualChunkTree.getLength());
+    assertNotEquals(0, actualChunkTree.getChunkChecksum());
+  }
+
+  @Test
+  public void testBuildTreeWithMissingChunks() throws Exception {
+    // These chunks will be used to seed both the expected and actual trees.
+    final long blockID = 1;
+    ChunkInfo chunk1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    // Chunk 2 is missing.
+    ChunkInfo chunk3 = buildChunk(2, ByteBuffer.wrap(new byte[]{4, 5, 6}));
+
+    // Build the expected tree proto using the test code.
+    ContainerProtos.BlockMerkleTree blockTree = buildExpectedBlockTree(blockID,
+        Arrays.asList(buildExpectedChunkTree(chunk1), 
buildExpectedChunkTree(chunk3)));
+    ContainerProtos.ContainerMerkleTree expectedTree = 
buildExpectedContainerTree(Collections.singletonList(blockTree));
+
+    // Use the ContainerMerkleTree to build the same tree.
+    ContainerMerkleTree actualTree = new ContainerMerkleTree();
+    actualTree.addChunks(blockID, Arrays.asList(chunk1, chunk3));
+
+    // Ensure the trees match.
+    ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+    assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+  }
+
+  /**
+   * A container is a set of blocks. Make sure the tree implementation is not 
dependent on continuity of block IDs.
+   */
+  @Test
+  public void testBuildTreeWithNonContiguousBlockIDs() throws Exception {
+    // Seed the expected and actual trees with the same chunks.
+    final long blockID1 = 1;
+    final long blockID3 = 3;
+    ChunkInfo b1c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b1c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b3c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b3c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+
+    // Build the expected tree proto using the test code.
+    ContainerProtos.BlockMerkleTree blockTree1 = 
buildExpectedBlockTree(blockID1,
+        Arrays.asList(buildExpectedChunkTree(b1c1), 
buildExpectedChunkTree(b1c2)));
+    ContainerProtos.BlockMerkleTree blockTree3 = 
buildExpectedBlockTree(blockID3,
+        Arrays.asList(buildExpectedChunkTree(b3c1), 
buildExpectedChunkTree(b3c2)));
+    ContainerProtos.ContainerMerkleTree expectedTree = 
buildExpectedContainerTree(
+        Arrays.asList(blockTree1, blockTree3));
+
+    // Use the ContainerMerkleTree to build the same tree.
+    // Add blocks and chunks out of order to test sorting.
+    ContainerMerkleTree actualTree = new ContainerMerkleTree();
+    actualTree.addChunks(blockID3, Arrays.asList(b3c2, b3c1));
+    actualTree.addChunks(blockID1, Arrays.asList(b1c1, b1c2));
+
+    // Ensure the trees match.
+    ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+    assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+  }
+
+  @Test
+  public void testAppendToBlocksWhileBuilding() throws Exception {
+    // Seed the expected and actual trees with the same chunks.
+    final long blockID1 = 1;
+    final long blockID2 = 2;
+    final long blockID3 = 3;
+    ChunkInfo b1c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b1c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2}));
+    ChunkInfo b1c3 = buildChunk(2, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b2c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b2c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{1, 2, 3}));
+    ChunkInfo b3c1 = buildChunk(0, ByteBuffer.wrap(new byte[]{1}));
+    ChunkInfo b3c2 = buildChunk(1, ByteBuffer.wrap(new byte[]{2, 3, 4}));
+
+    // Build the expected tree proto using the test code.
+    ContainerProtos.BlockMerkleTree blockTree1 = 
buildExpectedBlockTree(blockID1,
+        Arrays.asList(buildExpectedChunkTree(b1c1), 
buildExpectedChunkTree(b1c2), buildExpectedChunkTree(b1c3)));
+    ContainerProtos.BlockMerkleTree blockTree2 = 
buildExpectedBlockTree(blockID2,
+        Arrays.asList(buildExpectedChunkTree(b2c1), 
buildExpectedChunkTree(b2c2)));
+    ContainerProtos.BlockMerkleTree blockTree3 = 
buildExpectedBlockTree(blockID3,
+        Arrays.asList(buildExpectedChunkTree(b3c1), 
buildExpectedChunkTree(b3c2)));
+    ContainerProtos.ContainerMerkleTree expectedTree = 
buildExpectedContainerTree(
+        Arrays.asList(blockTree1, blockTree2, blockTree3));
+
+    // Use the ContainerMerkleTree to build the same tree.
+    // Test building by adding chunks to the blocks individually and out of 
order.
+    ContainerMerkleTree actualTree = new ContainerMerkleTree();
+    // Add all of block 2 first.
+    actualTree.addChunks(blockID2, Arrays.asList(b2c1, b2c2));
+    // Then add block 1 in multiple steps wth chunks out of order.
+    actualTree.addChunks(blockID1, Collections.singletonList(b1c2));
+    actualTree.addChunks(blockID1, Arrays.asList(b1c3, b1c1));
+    // Add a duplicate chunk to block 3. It should overwrite the existing one.
+    actualTree.addChunks(blockID3, Arrays.asList(b3c1, b3c2));
+    actualTree.addChunks(blockID3, Collections.singletonList(b3c2));
+
+    // Ensure the trees match.
+    ContainerProtos.ContainerMerkleTree actualTreeProto = actualTree.toProto();
+    assertTreesSortedAndMatch(expectedTree, actualTreeProto);
+  }
+
+  public static void 
assertTreesSortedAndMatch(ContainerProtos.ContainerMerkleTree expectedTree,
+      ContainerProtos.ContainerMerkleTree actualTree) {
+    assertEquals(expectedTree.getDataChecksum(), actualTree.getDataChecksum());
+    assertEquals(expectedTree.getBlockMerkleTreeCount(), 
actualTree.getBlockMerkleTreeCount());
+
+    long prevBlockID = -1;
+    for (int blockIndex = 0; blockIndex < 
expectedTree.getBlockMerkleTreeCount(); blockIndex++) {
+      ContainerProtos.BlockMerkleTree expectedBlockTree = 
expectedTree.getBlockMerkleTree(blockIndex);
+      ContainerProtos.BlockMerkleTree actualBlockTree = 
actualTree.getBlockMerkleTree(blockIndex);
+
+      // Blocks should be sorted by block ID.
+      long currentBlockID = actualBlockTree.getBlockID();
+      assertTrue(prevBlockID < currentBlockID);
+      prevBlockID = currentBlockID;
+
+      assertEquals(expectedBlockTree.getBlockID(), 
actualBlockTree.getBlockID());
+      assertEquals(expectedBlockTree.getBlockChecksum(), 
actualBlockTree.getBlockChecksum());
+
+      long prevChunkOffset = -1;
+      for (int chunkIndex = 0; chunkIndex < 
expectedBlockTree.getChunkMerkleTreeCount(); chunkIndex++) {
+        ContainerProtos.ChunkMerkleTree expectedChunkTree = 
expectedBlockTree.getChunkMerkleTree(chunkIndex);
+        ContainerProtos.ChunkMerkleTree actualChunkTree = 
actualBlockTree.getChunkMerkleTree(chunkIndex);
+
+        // Chunks should be sorted by offset.
+        long currentChunkOffset = actualChunkTree.getOffset();
+        assertTrue(prevChunkOffset < currentChunkOffset);
+        prevChunkOffset = currentChunkOffset;
+
+        assertEquals(expectedChunkTree.getOffset(), 
actualChunkTree.getOffset());
+        assertEquals(expectedChunkTree.getLength(), 
actualChunkTree.getLength());
+        assertEquals(expectedChunkTree.getChunkChecksum(), 
actualChunkTree.getChunkChecksum());
+      }
+    }
+  }
+
+  private ContainerProtos.ContainerMerkleTree 
buildExpectedContainerTree(List<ContainerProtos.BlockMerkleTree> blocks) {
+    return ContainerProtos.ContainerMerkleTree.newBuilder()
+        .addAllBlockMerkleTree(blocks)
+        .setDataChecksum(computeExpectedChecksum(
+            blocks.stream()
+                .map(ContainerProtos.BlockMerkleTree::getBlockChecksum)
+                .collect(Collectors.toList())))
+        .build();
+  }
+
+  private ContainerProtos.BlockMerkleTree buildExpectedBlockTree(long blockID,
+      List<ContainerProtos.ChunkMerkleTree> chunks) {
+    return ContainerProtos.BlockMerkleTree.newBuilder()
+        .setBlockID(blockID)
+        .setBlockChecksum(computeExpectedChecksum(
+            chunks.stream()
+                .map(ContainerProtos.ChunkMerkleTree::getChunkChecksum)
+                .collect(Collectors.toList())))
+        .addAllChunkMerkleTree(chunks)
+        .build();
+  }
+
+  private ContainerProtos.ChunkMerkleTree buildExpectedChunkTree(ChunkInfo 
chunk) {
+    return ContainerProtos.ChunkMerkleTree.newBuilder()
+        .setOffset(chunk.getOffset())
+        .setLength(chunk.getLen())
+        
.setChunkChecksum(computeExpectedChunkChecksum(chunk.getChecksumData().getChecksums()))
+        .build();
+  }
+
+  /**
+   * Builds a ChunkInfo object using the provided information. No new 
checksums are calculated, so this can be used
+   * as either the leaves of pre-computed merkle trees that serve as expected 
values, or as building blocks to pass
+   * to ContainerMerkleTree to have it build the whole tree from this 
information.
+   *
+   * @param indexInBlock Which chunk number within a block this is. The 
chunk's offset is automatically calculated
+   *     from this based on a fixed length.
+   * @param chunkChecksums The checksums within the chunk. Each is assumed to 
apply to a fixed value
+   *     "bytesPerChecksum" amount of data and are assumed to be contiguous.
+   * @return The ChunkInfo proto object built from this information.
+   */
+  public static ChunkInfo buildChunk(int indexInBlock, ByteBuffer... 
chunkChecksums) throws IOException {
+    // Each chunk checksum is added under the same ChecksumData object.
+    ContainerProtos.ChecksumData checksumData = 
ContainerProtos.ChecksumData.newBuilder()
+        .setType(ContainerProtos.ChecksumType.CRC32)
+        .setBytesPerChecksum(BYTES_PER_CHECKSUM)
+        .addAllChecksums(Arrays.stream(chunkChecksums)
+            .map(ByteString::copyFrom)
+            .collect(Collectors.toList()))
+        .build();
+
+    return ChunkInfo.getFromProtoBuf(
+        ContainerProtos.ChunkInfo.newBuilder()
+        .setChecksumData(checksumData)
+        .setChunkName("chunk")
+        .setOffset(indexInBlock * CHUNK_SIZE)
+        .setLen(CHUNK_SIZE)
+        .build());
+  }
+
+  private long computeExpectedChecksum(List<Long> checksums) {
+    CRC32 crc32 = new CRC32();
+    ByteBuffer longBuffer = ByteBuffer.allocate(Long.BYTES * checksums.size());
+    checksums.forEach(longBuffer::putLong);
+    longBuffer.flip();
+    crc32.update(longBuffer);
+    return crc32.getValue();
+  }
+
+  private long computeExpectedChunkChecksum(List<ByteString> checksums) {
+    CRC32 crc32 = new CRC32();
+    checksums.forEach(b -> crc32.update(b.asReadOnlyByteBuffer()));
+    return crc32.getValue();
+  }
+}
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 7755b993ca..833159c84e 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -527,6 +527,30 @@ message SendContainerRequest {
 message SendContainerResponse {
 }
 
+// Each chunk contains multiple checksums. This message aggregates them into 
one checksum for the whole chunk.
+message ChunkMerkleTree {
+  optional int64 offset = 1;
+  optional int64 length = 2;
+  optional int64 chunkChecksum = 3;
+}
+
+message BlockMerkleTree {
+  optional int64 blockID = 1;
+  optional int64 blockChecksum = 2;
+  repeated ChunkMerkleTree chunkMerkleTree = 3;
+}
+
+message ContainerMerkleTree {
+  optional int64 dataChecksum = 1;
+  repeated BlockMerkleTree blockMerkleTree = 2;
+}
+
+message ContainerChecksumInfo {
+  optional int64 containerID = 1;
+  optional ContainerMerkleTree containerMerkleTree = 2;
+  repeated int64 deletedBlocks = 3;
+}
+
 service XceiverClientProtocolService {
   // A client-to-datanode RPC to send container commands
   rpc send(stream ContainerCommandRequestProto) returns


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to