This is an automated email from the ASF dual-hosted git repository.

aswinshakil pushed a commit to branch HDDS-10239-container-reconciliation
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 47c1eaa1eb903175c1729186da48b7bffda85bd8
Merge: 2b4708b70e a355664093
Author: Aswin Shakil Balasubramanian <[email protected]>
AuthorDate: Fri May 16 17:41:00 2025 +0530

    Merge branch 'HDDS-10239-container-reconciliation' of 
https://github.com/apache/ozone into HDDS-10239-container-reconciliation
    
    Conflicts:
            
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
            
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
            
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java

 .../checksum/ContainerChecksumTreeManager.java     |  47 +-
 .../checksum/ContainerMerkleTreeMetrics.java       |  30 +-
 .../checksum/ContainerMerkleTreeWriter.java        |  72 ++-
 .../ozone/container/common/impl/ContainerSet.java  |  26 +
 .../container/common/impl/HddsDispatcher.java      |   3 +-
 .../ozone/container/common/interfaces/Handler.java |   4 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 413 ++++++++------
 .../container/ozoneimpl/ContainerController.java   |  12 +-
 .../container/ozoneimpl/ContainerScanError.java    |  33 +-
 .../ozoneimpl/OnDemandContainerDataScanner.java    |  76 +--
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   6 +-
 .../checksum/ContainerMerkleTreeTestUtils.java     |   3 +-
 .../checksum/TestContainerMerkleTreeWriter.java    |  63 ++-
 .../container/common/impl/TestContainerSet.java    |  24 +
 ...stContainerReconciliationWithMockDatanodes.java | 623 +++++++++++++++++++++
 .../container/keyvalue/TestKeyValueHandler.java    | 361 +-----------
 .../TestOnDemandContainerDataScanner.java          |  73 +--
 .../hdds/scm/container/ContainerReplica.java       |   2 +-
 .../TestContainerCommandReconciliation.java        |   6 +-
 19 files changed, 1190 insertions(+), 687 deletions(-)

diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
index aa21389e25,aa21389e25..63154cddd5
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ContainerMerkleTreeMetrics.java
@@@ -30,21 -30,21 +30,6 @@@ import org.apache.hadoop.metrics2.lib.M
  public class ContainerMerkleTreeMetrics {
    private static final String METRICS_SOURCE_NAME = 
ContainerMerkleTreeMetrics.class.getSimpleName();
  
--  public static ContainerMerkleTreeMetrics create() {
--    MetricsSystem ms = DefaultMetricsSystem.instance();
--    MetricsSource source = ms.getSource(METRICS_SOURCE_NAME);
--    if (source != null) {
--      ms.unregisterSource(METRICS_SOURCE_NAME);
--    }
--    return ms.register(METRICS_SOURCE_NAME, "Container Merkle Tree Metrics",
--        new ContainerMerkleTreeMetrics());
--  }
--
--  public static void unregister() {
--    MetricsSystem ms = DefaultMetricsSystem.instance();
--    ms.unregisterSource(METRICS_SOURCE_NAME);
--  }
--
    @Metric(about = "Number of Merkle tree write failure")
    private MutableCounterLong numMerkleTreeWriteFailure;
  
@@@ -72,6 -72,6 +57,21 @@@
    @Metric(about = "Merkle tree diff latency")
    private MutableRate merkleTreeDiffLatencyNS;
  
++  public static ContainerMerkleTreeMetrics create() {
++    MetricsSystem ms = DefaultMetricsSystem.instance();
++    MetricsSource source = ms.getSource(METRICS_SOURCE_NAME);
++    if (source != null) {
++      ms.unregisterSource(METRICS_SOURCE_NAME);
++    }
++    return ms.register(METRICS_SOURCE_NAME, "Container Merkle Tree Metrics",
++        new ContainerMerkleTreeMetrics());
++  }
++
++  public static void unregister() {
++    MetricsSystem ms = DefaultMetricsSystem.instance();
++    ms.unregisterSource(METRICS_SOURCE_NAME);
++  }
++
    public void incrementMerkleTreeWriteFailures() {
      this.numMerkleTreeWriteFailure.incr();
    }
diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 0f5c19fd33,8204f58953..7e95518e2c
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@@ -37,7 -37,7 +37,8 @@@ import java.util.concurrent.ConcurrentS
  import java.util.concurrent.ConcurrentSkipListSet;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.function.Consumer;
 +import java.util.function.ToLongFunction;
  import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
  import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
  import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
@@@ -64,27 -64,28 +65,29 @@@ public class ContainerSet implements It
        new ConcurrentSkipListSet<>();
    private final ConcurrentSkipListMap<Long, Long> recoveringContainerMap =
        new ConcurrentSkipListMap<>();
 -  private Clock clock;
 +  private final Clock clock;
    private long recoveringTimeout;
 -  private final Table<Long, String> containerIdsTable;
 +  private final Table<ContainerID, String> containerIdsTable;
+   // Handler that will be invoked when a scan of a container in this set is 
requested.
+   private Consumer<Container<?>> containerScanHandler;
  
 -  @VisibleForTesting
 -  public ContainerSet(long recoveringTimeout) {
 -    this(new InMemoryTestTable<>(), recoveringTimeout);
 +  public static ContainerSet newReadOnlyContainerSet(long recoveringTimeout) {
 +    return new ContainerSet(null, recoveringTimeout);
 +  }
 +
 +  public static ContainerSet newRwContainerSet(Table<ContainerID, String> 
containerIdsTable, long recoveringTimeout) {
 +    Objects.requireNonNull(containerIdsTable, "containerIdsTable == null");
 +    return new ContainerSet(containerIdsTable, recoveringTimeout);
    }
  
 -  public ContainerSet(Table<Long, String> continerIdsTable, long 
recoveringTimeout) {
 -    this(continerIdsTable, recoveringTimeout, false);
 +  private ContainerSet(Table<ContainerID, String> continerIdsTable, long 
recoveringTimeout) {
 +    this(continerIdsTable, recoveringTimeout, null);
    }
  
 -  public ContainerSet(Table<Long, String> continerIdsTable, long 
recoveringTimeout, boolean readOnly) {
 -    this.clock = Clock.system(ZoneOffset.UTC);
 +  ContainerSet(Table<ContainerID, String> continerIdsTable, long 
recoveringTimeout, Clock clock) {
 +    this.clock = clock != null ? clock : Clock.systemUTC();
      this.containerIdsTable = continerIdsTable;
      this.recoveringTimeout = recoveringTimeout;
 -    if (!readOnly && containerIdsTable == null) {
 -      throw new IllegalArgumentException("Container table cannot be null when 
container set is not read only");
 -    }
    }
  
    public long getCurrentTime() {
diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 9c8fed3ca5,e240ff6bf4..a46e097c0b
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@@ -70,9 -71,9 +70,8 @@@ import org.apache.hadoop.ozone.containe
  import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
  import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
  import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 -import org.apache.hadoop.ozone.container.common.volume.VolumeUsage;
  import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScanError;
  import org.apache.hadoop.ozone.container.ozoneimpl.DataScanResult;
- import 
org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
  import org.apache.hadoop.util.Time;
  import org.apache.ratis.statemachine.StateMachine;
  import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 8fa7157091,5feec61a66..26ec6226fc
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@@ -21,8 -21,7 +21,8 @@@ import com.google.common.annotations.Vi
  import java.io.IOException;
  import java.io.InputStream;
  import java.io.OutputStream;
 +import java.time.Clock;
- import java.util.Set;
+ import java.util.Collection;
  import org.apache.hadoop.hdds.conf.ConfigurationSource;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java
index db558f4847,db558f4847..1b167dabd8
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerScanError.java
@@@ -23,22 -23,22 +23,6 @@@ import java.io.File
   * This class is used to identify any error that may be seen while scanning a 
container.
   */
  public class ContainerScanError {
--  /**
--   * Represents the reason a container scan failed and a container should
--   * be marked unhealthy.
--   */
--  public enum FailureType {
--    MISSING_CONTAINER_DIR,
--    MISSING_METADATA_DIR,
--    MISSING_CONTAINER_FILE,
--    MISSING_CHUNKS_DIR,
--    MISSING_CHUNK_FILE,
--    CORRUPT_CONTAINER_FILE,
--    CORRUPT_CHUNK,
--    INCONSISTENT_CHUNK_LENGTH,
--    INACCESSIBLE_DB,
--    WRITE_FAILURE,
--  }
  
    private final File unhealthyFile;
    private final FailureType failureType;
@@@ -66,4 -66,4 +50,21 @@@
    public String toString() {
      return failureType + " for file " + unhealthyFile + " with exception: " + 
exception;
    }
++
++  /**
++   * Represents the reason a container scan failed and a container should
++   * be marked unhealthy.
++   */
++  public enum FailureType {
++    MISSING_CONTAINER_DIR,
++    MISSING_METADATA_DIR,
++    MISSING_CONTAINER_FILE,
++    MISSING_CHUNKS_DIR,
++    MISSING_CHUNK_FILE,
++    CORRUPT_CONTAINER_FILE,
++    CORRUPT_CHUNK,
++    INCONSISTENT_CHUNK_LENGTH,
++    INACCESSIBLE_DB,
++    WRITE_FAILURE,
++  }
  }
diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
index 2b66ead23b,6197657114..ad30aa0c43
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OnDemandContainerDataScanner.java
@@@ -42,11 -41,9 +41,9 @@@ import org.slf4j.LoggerFactory
   * Class for performing on demand scans of containers.
   */
  public final class OnDemandContainerDataScanner {
 -  public static final Logger LOG =
 +  private static final Logger LOG =
        LoggerFactory.getLogger(OnDemandContainerDataScanner.class);
  
-   private static volatile OnDemandContainerDataScanner instance;
- 
    private final ExecutorService scanExecutor;
    private final ContainerController containerController;
    private final DataTransferThrottler throttler;
diff --cc 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java
index 0000000000,d290cea5bb..ced198eecc
mode 000000,100644..100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestContainerReconciliationWithMockDatanodes.java
@@@ -1,0 -1,621 +1,623 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements. See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License. You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.ozone.container.keyvalue;
+ 
+ import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+ import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+ import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
+ import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
++import static 
org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
+ import static org.assertj.core.api.Assertions.assertThat;
+ import static org.assertj.core.api.Assertions.fail;
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertFalse;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ import static org.mockito.ArgumentMatchers.any;
+ import static org.mockito.ArgumentMatchers.anyLong;
+ import static org.mockito.ArgumentMatchers.anyMap;
+ 
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.UncheckedIOException;
+ import java.nio.ByteBuffer;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.nio.file.StandardOpenOption;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Optional;
+ import java.util.Random;
+ import java.util.UUID;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeoutException;
+ import java.util.function.Function;
+ import java.util.stream.Collectors;
+ import java.util.stream.Stream;
+ import org.apache.commons.io.IOUtils;
+ import org.apache.commons.text.RandomStringGenerator;
+ import org.apache.hadoop.hdds.HddsUtils;
+ import org.apache.hadoop.hdds.client.BlockID;
+ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+ import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+ import org.apache.hadoop.hdds.utils.db.BatchOperation;
+ import org.apache.hadoop.ozone.OzoneConsts;
+ import org.apache.hadoop.ozone.common.Checksum;
+ import org.apache.hadoop.ozone.common.ChecksumData;
+ import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
+ import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
+ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
+ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+ import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
+ import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+ import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+ import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+ import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+ import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
+ import 
org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
+ import org.apache.ozone.test.GenericTestUtils;
+ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.api.AfterEach;
+ import org.junit.jupiter.api.Assertions;
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.io.TempDir;
+ import org.junit.jupiter.params.provider.Arguments;
+ import org.junit.jupiter.params.provider.MethodSource;
+ import org.mockito.MockedStatic;
+ import org.mockito.Mockito;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * This unit test simulates three datanodes with replicas of a container that 
need to be reconciled.
+  * It creates three KeyValueHandler instances to represent each datanode, and 
each instance is working on a container
+  * replica that is stored in a local directory. The reconciliation client is 
mocked to return the corresponding local
+  * container for each datanode peer.
+  */
+ public class TestContainerReconciliationWithMockDatanodes {
++
++  public static final Logger LOG = 
LoggerFactory.getLogger(TestContainerReconciliationWithMockDatanodes.class);
++
++  // All container replicas will be placed in this directory, and the same 
replicas will be re-used for each test run.
++  @TempDir
++  private static Path containerDir;
++  private static DNContainerOperationClient dnClient;
++  private static MockedStatic<ContainerProtocolCalls> containerProtocolMock;
++  private static List<MockDatanode> datanodes;
++  private static long healthyDataChecksum;
++
++  private static final String CLUSTER_ID = UUID.randomUUID().toString();
++  private static final long CONTAINER_ID = 100L;
++  private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB;
++  private static final int CHUNKS_PER_BLOCK = 4;
++  private static final int NUM_DATANODES = 3;
++
+   /**
+    * Number of corrupt blocks and chunks.
+    *
+    * TODO HDDS-11942 support more combinations of corruptions.
+    */
+   public static Stream<Arguments> corruptionValues() {
+     return Stream.of(
+         Arguments.of(5, 0),
+         Arguments.of(0, 5),
+         Arguments.of(0, 10),
+         Arguments.of(10, 0),
+         Arguments.of(5, 10),
+         Arguments.of(10, 5),
+         Arguments.of(2, 3),
+         Arguments.of(3, 2),
+         Arguments.of(4, 6),
+         Arguments.of(6, 4),
+         Arguments.of(6, 9),
+         Arguments.of(9, 6)
+     );
+   }
+ 
 -  public static final Logger LOG = 
LoggerFactory.getLogger(TestContainerReconciliationWithMockDatanodes.class);
 -
 -  // All container replicas will be placed in this directory, and the same 
replicas will be re-used for each test run.
 -  @TempDir
 -  private static Path containerDir;
 -  private static DNContainerOperationClient dnClient;
 -  private static MockedStatic<ContainerProtocolCalls> containerProtocolMock;
 -  private static List<MockDatanode> datanodes;
 -  private static long healthyDataChecksum;
 -
 -  private static final String CLUSTER_ID = UUID.randomUUID().toString();
 -  private static final long CONTAINER_ID = 100L;
 -  private static final int CHUNK_LEN = 3 * (int) OzoneConsts.KB;
 -  private static final int CHUNKS_PER_BLOCK = 4;
 -  private static final int NUM_DATANODES = 3;
 -
+   /**
+    * Use the same container instances throughout the tests. Each 
reconciliation should make a full repair, resetting
+    * the state for the next test.
+    */
+   @BeforeAll
+   public static void setup() throws Exception {
+     LOG.info("Data written to {}", containerDir);
+     dnClient = new DNContainerOperationClient(new OzoneConfiguration(), null, 
null);
+     datanodes = new ArrayList<>();
+ 
+     // Create a container with 15 blocks and 3 replicas.
+     for (int i = 0; i < NUM_DATANODES; i++) {
+       DatanodeDetails dnDetails = randomDatanodeDetails();
+       // Use this fake host name to track the node through the test since 
it's easier to visualize than a UUID.
+       dnDetails.setHostName("dn" + (i + 1));
+       MockDatanode dn = new MockDatanode(dnDetails, containerDir);
+       dn.addContainerWithBlocks(CONTAINER_ID, 15);
+       datanodes.add(dn);
+     }
+ 
+     datanodes.forEach(d -> d.scanContainer(CONTAINER_ID));
+     healthyDataChecksum = assertUniqueChecksumCount(CONTAINER_ID, datanodes, 
1);
+     // Do not count the initial synchronous scan to build the merkle tree 
towards the scan count in the tests.
+     // This lets each test run start counting the number of scans from zero.
+     datanodes.forEach(MockDatanode::resetOnDemandScanCount);
+ 
+     containerProtocolMock = Mockito.mockStatic(ContainerProtocolCalls.class);
+     mockContainerProtocolCalls();
+   }
+ 
+   @AfterEach
+   public void reset() {
+     datanodes.forEach(MockDatanode::resetOnDemandScanCount);
+   }
+ 
+   @AfterAll
+   public static void teardown() {
+     if (containerProtocolMock != null) {
+       containerProtocolMock.close();
+     }
+   }
+ 
+   // TODO HDDS-10374 once on-demand scanner can build merkle trees this test 
should pass.
+   // @ParameterizedTest
+   @MethodSource("corruptionValues")
+   public void testContainerReconciliation(int numBlocksToDelete, int 
numChunksToCorrupt) throws Exception {
+     LOG.info("Healthy data checksum for container {} in this test is {}", 
CONTAINER_ID,
+         HddsUtils.checksumToString(healthyDataChecksum));
+     // Introduce corruption in each container on different replicas.
+     List<MockDatanode> dnsToCorrupt = 
datanodes.stream().limit(2).collect(Collectors.toList());
+ 
+     dnsToCorrupt.get(0).introduceCorruption(CONTAINER_ID, numBlocksToDelete, 
numChunksToCorrupt, false);
+     dnsToCorrupt.get(1).introduceCorruption(CONTAINER_ID, numBlocksToDelete, 
numChunksToCorrupt, true);
+     // Use synchronous on-demand scans to re-build the merkle trees after 
corruption.
+     datanodes.forEach(d -> d.scanContainer(CONTAINER_ID));
+     // Without reconciliation, checksums should be different because of the 
corruption.
+     assertUniqueChecksumCount(CONTAINER_ID, datanodes, 3);
+ 
+     // Each datanode should have had one on-demand scan during test setup, 
and a second one after corruption was
+     // introduced.
+     waitForExpectedScanCount(1);
+ 
+     // Reconcile each datanode with its peers.
+     // In a real cluster, SCM will not send a command to reconcile a datanode 
with itself.
+     for (MockDatanode current : datanodes) {
+       List<DatanodeDetails> peers = datanodes.stream()
+           .map(MockDatanode::getDnDetails)
+           .filter(other -> !current.getDnDetails().equals(other))
+           .collect(Collectors.toList());
+       current.reconcileContainer(dnClient, peers, CONTAINER_ID);
+     }
+     // Reconciliation should have triggered a second on-demand scan for each 
replica. Wait for them to finish before
+     // checking the results.
+     waitForExpectedScanCount(2);
+     // After reconciliation, checksums should be the same for all containers.
+     long repairedDataChecksum = assertUniqueChecksumCount(CONTAINER_ID, 
datanodes, 1);
+     assertEquals(healthyDataChecksum, repairedDataChecksum);
+   }
+ 
+   /**
+    * Uses the on-demand container scanner metrics to wait for the expected 
number of on-demand scans to complete on
+    * every datanode.
+    */
+   private void waitForExpectedScanCount(int expectedCount) throws Exception {
+     for (MockDatanode datanode: datanodes) {
+       try {
+         GenericTestUtils.waitFor(() -> datanode.getOnDemandScanCount() == 
expectedCount, 100, 10_000);
+       } catch (TimeoutException ex) {
+         LOG.error("Timed out waiting for on-demand scan count {} to reach 
expected count {} on datanode {}",
+             datanode.getOnDemandScanCount(), expectedCount, datanode);
+         throw ex;
+       }
+     }
+   }
+ 
+   /**
+    * Checks for the expected number of unique checksums among a container on 
the provided datanodes.
+    * @return The data checksum from one of the nodes. Useful if 
expectedUniqueChecksums = 1.
+    */
+   private static long assertUniqueChecksumCount(long containerID, 
Collection<MockDatanode> nodes,
+       long expectedUniqueChecksums) {
+     long actualUniqueChecksums = nodes.stream()
+         .mapToLong(d -> d.checkAndGetDataChecksum(containerID))
+         .distinct()
+         .count();
+     assertEquals(expectedUniqueChecksums, actualUniqueChecksums);
+     return 
nodes.stream().findAny().get().checkAndGetDataChecksum(containerID);
+   }
+ 
+   private static void mockContainerProtocolCalls() {
+     Map<DatanodeDetails, MockDatanode> dnMap = datanodes.stream()
+         .collect(Collectors.toMap(MockDatanode::getDnDetails, 
Function.identity()));
+ 
+     // Mock getContainerChecksumInfo
+     containerProtocolMock.when(() -> 
ContainerProtocolCalls.getContainerChecksumInfo(any(), anyLong(), any()))
+         .thenAnswer(inv -> {
+           XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+           long containerID = inv.getArgument(1);
+           Pipeline pipeline = xceiverClientSpi.getPipeline();
+           assertEquals(1, pipeline.size());
+           DatanodeDetails dn = pipeline.getFirstNode();
+           return dnMap.get(dn).getChecksumInfo(containerID);
+         });
+ 
+     // Mock getBlock
+     containerProtocolMock.when(() -> ContainerProtocolCalls.getBlock(any(), 
any(), any(), any(), anyMap()))
+         .thenAnswer(inv -> {
+           XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+           BlockID blockID = inv.getArgument(2);
+           Pipeline pipeline = xceiverClientSpi.getPipeline();
+           assertEquals(1, pipeline.size());
+           DatanodeDetails dn = pipeline.getFirstNode();
+           return dnMap.get(dn).getBlock(blockID);
+         });
+ 
+     // Mock readChunk
+     containerProtocolMock.when(() -> ContainerProtocolCalls.readChunk(any(), 
any(), any(), any(), any()))
+         .thenAnswer(inv -> {
+           XceiverClientSpi xceiverClientSpi = inv.getArgument(0);
+           ContainerProtos.ChunkInfo chunkInfo = inv.getArgument(1);
+           ContainerProtos.DatanodeBlockID blockId = inv.getArgument(2);
+           List<XceiverClientSpi.Validator> checksumValidators = 
inv.getArgument(3);
+           Pipeline pipeline = xceiverClientSpi.getPipeline();
+           assertEquals(1, pipeline.size());
+           DatanodeDetails dn = pipeline.getFirstNode();
+           return dnMap.get(dn).readChunk(blockId, chunkInfo, 
checksumValidators);
+         });
+ 
+     containerProtocolMock.when(() -> 
ContainerProtocolCalls.toValidatorList(any())).thenCallRealMethod();
+   }
+ 
+   /**
+    * This class wraps a KeyValueHandler instance with just enough features to 
test its reconciliation functionality.
+    */
+   private static class MockDatanode {
+     private final KeyValueHandler handler;
+     private final DatanodeDetails dnDetails;
+     private final OnDemandContainerDataScanner onDemandScanner;
+     private final ContainerSet containerSet;
+     private final OzoneConfiguration conf;
+ 
+     private final Logger log;
+ 
+     MockDatanode(DatanodeDetails dnDetails, Path tempDir) throws IOException {
+       this.dnDetails = dnDetails;
+       log = LoggerFactory.getLogger("mock-datanode-" + 
dnDetails.getHostName());
+       Path dataVolume = Paths.get(tempDir.toString(), 
dnDetails.getHostName(), "data");
+       Path metadataVolume = Paths.get(tempDir.toString(), 
dnDetails.getHostName(), "metadata");
+ 
+       this.conf = new OzoneConfiguration();
+       conf.set(HDDS_DATANODE_DIR_KEY, dataVolume.toString());
+       conf.set(OZONE_METADATA_DIRS, metadataVolume.toString());
+ 
 -      containerSet = new ContainerSet(1000);
++      containerSet = newContainerSet();
+       MutableVolumeSet volumeSet = createVolumeSet();
+       handler = ContainerTestUtils.getKeyValueHandler(conf, 
dnDetails.getUuidString(), containerSet, volumeSet);
+       handler.setClusterID(CLUSTER_ID);
+ 
+       ContainerController controller = new ContainerController(containerSet,
+           
Collections.singletonMap(ContainerProtos.ContainerType.KeyValueContainer, 
handler));
+       onDemandScanner = new OnDemandContainerDataScanner(
+           conf.getObject(ContainerScannerConfiguration.class), controller);
+       // Register the on-demand container scanner with the container set used 
by the KeyValueHandler.
+       
containerSet.registerContainerScanHandler(onDemandScanner::scanContainer);
+     }
+ 
+     public DatanodeDetails getDnDetails() {
+       return dnDetails;
+     }
+ 
+     /**
+      * @throws IOException for general IO errors accessing the checksum file
+      * @throws java.io.FileNotFoundException When the checksum file does not 
exist.
+      */
+     public ContainerProtos.GetContainerChecksumInfoResponseProto 
getChecksumInfo(long containerID) throws IOException {
+       KeyValueContainer container = getContainer(containerID);
+       ByteString checksumInfo = 
handler.getChecksumManager().getContainerChecksumInfo(container.getContainerData());
+       return 
ContainerProtos.GetContainerChecksumInfoResponseProto.newBuilder()
+           .setContainerID(containerID)
+           .setContainerChecksumInfo(checksumInfo)
+           .build();
+     }
+ 
+     /**
+      * Verifies that the data checksum on disk matches the one in memory, and 
returns the data checksum.
+      */
+     public long checkAndGetDataChecksum(long containerID) {
+       KeyValueContainer container = getContainer(containerID);
+       long dataChecksum = 0;
+       try {
+         Optional<ContainerProtos.ContainerChecksumInfo> containerChecksumInfo 
=
+             handler.getChecksumManager().read(container.getContainerData());
+         assertTrue(containerChecksumInfo.isPresent());
+         dataChecksum = 
containerChecksumInfo.get().getContainerMerkleTree().getDataChecksum();
+         assertEquals(container.getContainerData().getDataChecksum(), 
dataChecksum);
+       } catch (IOException ex) {
+         fail("Failed to read container checksum from disk", ex);
+       }
+       log.info("Retrieved data checksum {} from container {}", 
HddsUtils.checksumToString(dataChecksum),
+           containerID);
+       return dataChecksum;
+     }
+ 
+     public ContainerProtos.GetBlockResponseProto getBlock(BlockID blockID) 
throws IOException {
+       KeyValueContainer container = getContainer(blockID.getContainerID());
+       ContainerProtos.BlockData blockData = 
handler.getBlockManager().getBlock(container, blockID).getProtoBufMessage();
+       return ContainerProtos.GetBlockResponseProto.newBuilder()
+           .setBlockData(blockData)
+           .build();
+     }
+ 
+     public ContainerProtos.ReadChunkResponseProto 
readChunk(ContainerProtos.DatanodeBlockID blockId,
+         ContainerProtos.ChunkInfo chunkInfo, List<XceiverClientSpi.Validator> 
validators) throws IOException {
+       KeyValueContainer container = getContainer(blockId.getContainerID());
+       ContainerProtos.ReadChunkResponseProto readChunkResponseProto =
+           ContainerProtos.ReadChunkResponseProto.newBuilder()
+               .setBlockID(blockId)
+               .setChunkData(chunkInfo)
+               .setData(handler.getChunkManager().readChunk(container, 
BlockID.getFromProtobuf(blockId),
+                   ChunkInfo.getFromProtoBuf(chunkInfo), null).toByteString())
+               .build();
+       verifyChecksums(readChunkResponseProto, blockId, chunkInfo, validators);
+       return readChunkResponseProto;
+     }
+ 
+     public void verifyChecksums(ContainerProtos.ReadChunkResponseProto 
readChunkResponseProto,
+         ContainerProtos.DatanodeBlockID blockId, ContainerProtos.ChunkInfo 
chunkInfo,
+         List<XceiverClientSpi.Validator> validators) throws IOException {
+       assertFalse(validators.isEmpty());
+       ContainerProtos.ContainerCommandRequestProto requestProto =
+           ContainerProtos.ContainerCommandRequestProto.newBuilder()
+               .setCmdType(ContainerProtos.Type.ReadChunk)
+               .setContainerID(blockId.getContainerID())
+               .setDatanodeUuid(dnDetails.getUuidString())
+               .setReadChunk(
+                   ContainerProtos.ReadChunkRequestProto.newBuilder()
+                       .setBlockID(blockId)
+                       .setChunkData(chunkInfo)
+                       .build())
+               .build();
+       ContainerProtos.ContainerCommandResponseProto responseProto =
+           ContainerProtos.ContainerCommandResponseProto.newBuilder()
+               .setCmdType(ContainerProtos.Type.ReadChunk)
+               .setResult(ContainerProtos.Result.SUCCESS)
+               .setReadChunk(readChunkResponseProto).build();
+       for (XceiverClientSpi.Validator function : validators) {
+         function.accept(requestProto, responseProto);
+       }
+     }
+ 
+     public KeyValueContainer getContainer(long containerID) {
+       return (KeyValueContainer) containerSet.getContainer(containerID);
+     }
+ 
+     /**
+      * Triggers a synchronous scan of the container. This method will block 
until the scan completes.
+      */
+     public void scanContainer(long containerID) {
+       Optional<Future<?>> scanFuture = 
onDemandScanner.scanContainer(containerSet.getContainer(containerID));
+       assertTrue(scanFuture.isPresent());
+ 
+       try {
+         scanFuture.get().get();
+       } catch (InterruptedException | ExecutionException e) {
+         fail("On demand container scan failed", e);
+       }
+     }
+ 
+     public int getOnDemandScanCount() {
+       return onDemandScanner.getMetrics().getNumContainersScanned();
+     }
+ 
+     public void resetOnDemandScanCount() {
+       onDemandScanner.getMetrics().resetNumContainersScanned();
+     }
+ 
+     public void reconcileContainer(DNContainerOperationClient client, 
Collection<DatanodeDetails> peers,
+         long containerID) {
+       log.info("Beginning reconciliation on this mock datanode");
+       try {
+         handler.reconcileContainer(client, 
containerSet.getContainer(containerID), peers);
+       } catch (IOException ex) {
+         fail("Container reconciliation failed", ex);
+       }
+     }
+ 
+     /**
+      * Create a container with the specified number of blocks. Block data is 
human-readable so the block files can be
+      * inspected when debugging the test.
+      */
+     public void addContainerWithBlocks(long containerId, int blocks) throws 
Exception {
+       ContainerProtos.CreateContainerRequestProto createRequest =
+           ContainerProtos.CreateContainerRequestProto.newBuilder()
+               
.setContainerType(ContainerProtos.ContainerType.KeyValueContainer)
+               .build();
+       ContainerProtos.ContainerCommandRequestProto request =
+           ContainerProtos.ContainerCommandRequestProto.newBuilder()
+               .setCmdType(ContainerProtos.Type.CreateContainer)
+               .setCreateContainer(createRequest)
+               .setContainerID(containerId)
+               .setDatanodeUuid(dnDetails.getUuidString())
+               .build();
+ 
+       handler.handleCreateContainer(request, null);
+       KeyValueContainer container = getContainer(containerId);
+ 
+       // Verify container is initially empty.
+       File chunksPath = new 
File(container.getContainerData().getChunksPath());
+       ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 0, 
0);
+ 
+       // Create data to put in the container.
+       // Seed using the container ID so that all replicas are identical.
+       RandomStringGenerator generator = new RandomStringGenerator.Builder()
+           .withinRange('a', 'z')
+           .usingRandom(new Random(containerId)::nextInt)
+           .get();
+ 
+       // This array will keep getting populated with new bytes for each chunk.
+       byte[] chunkData = new byte[CHUNK_LEN];
+       int bytesPerChecksum = 2 * (int) OzoneConsts.KB;
+ 
+       // Add data to the container.
+       List<ContainerProtos.ChunkInfo> chunkList = new ArrayList<>();
+       for (int i = 0; i < blocks; i++) {
+         BlockID blockID = new BlockID(containerId, i);
+         BlockData blockData = new BlockData(blockID);
+ 
+         chunkList.clear();
+         for (long chunkCount = 0; chunkCount < CHUNKS_PER_BLOCK; 
chunkCount++) {
+           String chunkName = "chunk" + chunkCount;
+           long offset = chunkCount * chunkData.length;
+           ChunkInfo info = new ChunkInfo(chunkName, offset, chunkData.length);
+ 
+           // Generate data for the chunk and compute its checksum.
+           // Data is generated as one ascii character per line, so block 
files are human-readable if further
+           // debugging is needed.
+           for (int c = 0; c < chunkData.length; c += 2) {
+             chunkData[c] = (byte)generator.generate(1).charAt(0);
+             chunkData[c + 1] = (byte)'\n';
+           }
+ 
+           Checksum checksum = new 
Checksum(ContainerProtos.ChecksumType.CRC32, bytesPerChecksum);
+           ChecksumData checksumData = checksum.computeChecksum(chunkData);
+           info.setChecksumData(checksumData);
+           // Write chunk and checksum into the container.
+           chunkList.add(info.getProtoBufMessage());
+           handler.getChunkManager().writeChunk(container, blockID, info,
+               ByteBuffer.wrap(chunkData), WRITE_STAGE);
+         }
+         handler.getChunkManager().finishWriteChunks(container, blockData);
+         blockData.setChunks(chunkList);
+         blockData.setBlockCommitSequenceId(i);
+         handler.getBlockManager().putBlock(container, blockData);
+       }
+       ContainerLayoutTestInfo.FILE_PER_BLOCK.validateFileCount(chunksPath, 
blocks, (long) blocks * CHUNKS_PER_BLOCK);
+       container.markContainerForClose();
+       handler.closeContainer(container);
+     }
+ 
+     @Override
+     public String toString() {
+       return dnDetails.toString();
+     }
+ 
+     /**
+      * Returns a list of all blocks in the container sorted numerically by 
blockID.
+      * For example, the unsorted list would have the first blocks as 1, 10, 
11...
+      * The list returned by this method would have the first blocks as 1, 2, 
3...
+      */
+     private List<BlockData> getSortedBlocks(KeyValueContainer container) 
throws IOException {
+       List<BlockData> blockDataList = 
handler.getBlockManager().listBlock(container, -1, 100);
+       blockDataList.sort(Comparator.comparingLong(BlockData::getLocalID));
+       return blockDataList;
+     }
+ 
+     /**
+      * Introduce corruption in the container.
+      * 1. Delete blocks from the container.
+      * 2. Corrupt chunks at an offset.
+      * If revers is true, the blocks and chunks are deleted in reverse order.
+      */
+     public void introduceCorruption(long containerID, int numBlocksToDelete, 
int numChunksToCorrupt, boolean reverse)
+         throws IOException {
+       KeyValueContainer container = getContainer(containerID);
+       KeyValueContainerData containerData = container.getContainerData();
+       // Simulate missing blocks
+       try (DBHandle handle = BlockUtils.getDB(containerData, conf);
+            BatchOperation batch = 
handle.getStore().getBatchHandler().initBatchOperation()) {
+         List<BlockData> blockDataList = getSortedBlocks(container);
+         int size = blockDataList.size();
+         for (int i = 0; i < numBlocksToDelete; i++) {
+           BlockData blockData = reverse ? blockDataList.get(size - 1 - i) : 
blockDataList.get(i);
+           File blockFile = TestContainerCorruptions.getBlock(container, 
blockData.getBlockID().getLocalID());
+           Assertions.assertTrue(blockFile.delete());
+           handle.getStore().getBlockDataTable().deleteWithBatch(batch,
+               containerData.getBlockKey(blockData.getLocalID()));
+           log.info("Deleting block {} from container {}", 
blockData.getBlockID().getLocalID(), containerID);
+         }
+         handle.getStore().getBatchHandler().commitBatchOperation(batch);
+         // Check that the correct number of blocks were deleted.
+         blockDataList = getSortedBlocks(container);
+         assertEquals(numBlocksToDelete, size - blockDataList.size());
+       }
+ 
+       // Corrupt chunks at an offset.
+       List<BlockData> blockDataList = getSortedBlocks(container);
+       int size = blockDataList.size();
+       for (int i = 0; i < numChunksToCorrupt; i++) {
+         int blockIndex = reverse ? size - 1 - (i % size) : i % size;
+         BlockData blockData = blockDataList.get(blockIndex);
+         int chunkIndex = i / size;
+         File blockFile = TestContainerCorruptions.getBlock(container, 
blockData.getBlockID().getLocalID());
+         List<ContainerProtos.ChunkInfo> chunks = new 
ArrayList<>(blockData.getChunks());
+         ContainerProtos.ChunkInfo chunkInfo = chunks.remove(chunkIndex);
+         corruptFileAtOffset(blockFile, chunkInfo.getOffset(), 
chunkInfo.getLen());
+         log.info("Corrupting block {} at offset {} in container {}", 
blockData.getBlockID().getLocalID(),
+             chunkInfo.getOffset(), containerID);
+       }
+     }
+ 
+     private MutableVolumeSet createVolumeSet() throws IOException {
+       MutableVolumeSet volumeSet = new 
MutableVolumeSet(dnDetails.getUuidString(), conf, null,
+           StorageVolume.VolumeType.DATA_VOLUME, null);
+       createDbInstancesForTestIfNeeded(volumeSet, CLUSTER_ID, CLUSTER_ID, 
conf);
+       return volumeSet;
+     }
+ 
+     /**
+      * Overwrite the file with random bytes at an offset within the given 
length.
+      */
+     private static void corruptFileAtOffset(File file, long offset, long 
chunkLength) {
+       try {
+         final int fileLength = (int) file.length();
+         assertTrue(fileLength >= offset + chunkLength);
+         final int chunkEnd = (int)(offset + chunkLength);
+ 
+         Path path = file.toPath();
+         final byte[] original = IOUtils.readFully(Files.newInputStream(path), 
fileLength);
+ 
+         // Corrupt the last byte and middle bytes of the block. The scanner 
should log this as two errors.
+         final byte[] corruptedBytes = Arrays.copyOf(original, fileLength);
+         corruptedBytes[chunkEnd - 1] = (byte) (original[chunkEnd - 1] << 1);
+         final long chunkMid = offset + (chunkLength - offset) / 2;
+         corruptedBytes[(int) (chunkMid / 2)] = (byte) (original[(int) 
(chunkMid / 2)] << 1);
+ 
+ 
+         Files.write(path, corruptedBytes,
+             StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.SYNC);
+ 
+         assertThat(IOUtils.readFully(Files.newInputStream(path), fileLength))
+             .isEqualTo(corruptedBytes)
+             .isNotEqualTo(original);
+       } catch (IOException ex) {
+         // Fail the test.
+         throw new UncheckedIOException(ex);
+       }
+     }
+   }
+ }
diff --cc 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index da1bb06471,7530a33327..95ed16b2d9
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@@ -17,7 -17,7 +17,6 @@@
  
  package org.apache.hadoop.ozone.container.keyvalue;
  
- import static java.nio.charset.StandardCharsets.UTF_8;
 -import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
  import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
  import static 
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
  import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
@@@ -26,28 -26,20 +25,22 @@@ import static org.apache.hadoop.hdds.pr
  import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
  import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY;
  import static org.apache.hadoop.ozone.OzoneConsts.GB;
- import static 
org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager.getContainerChecksumFile;
- import static 
org.apache.hadoop.ozone.container.checksum.ContainerMerkleTreeTestUtils.writeContainerDataTreeProto;
- import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_STAGE;
  import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createBlockMetaData;
- import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
 +import static 
org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
- import static 
org.apache.hadoop.ozone.container.keyvalue.TestContainerCorruptions.getBlock;
  import static org.assertj.core.api.Assertions.assertThat;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertNotEquals;
  import static org.junit.jupiter.api.Assertions.assertNotNull;
  import static org.junit.jupiter.api.Assertions.assertNull;
 -import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
 +import static org.mockito.ArgumentMatchers.any;
  import static org.mockito.ArgumentMatchers.anyLong;
- import static org.mockito.ArgumentMatchers.anyMap;
  import static org.mockito.ArgumentMatchers.eq;
 -import static org.mockito.Mockito.any;
  import static org.mockito.Mockito.atMostOnce;
 +import static org.mockito.Mockito.doAnswer;
  import static org.mockito.Mockito.mock;
  import static org.mockito.Mockito.reset;
 +import static org.mockito.Mockito.spy;
  import static org.mockito.Mockito.times;
  import static org.mockito.Mockito.verify;
  import static org.mockito.Mockito.when;
@@@ -64,26 -52,13 +53,15 @@@ import java.time.Clock
  import java.util.Collections;
  import java.util.EnumSet;
  import java.util.HashMap;
- import java.util.HashSet;
  import java.util.List;
- import java.util.Map;
- import java.util.Optional;
- import java.util.Random;
  import java.util.Set;
  import java.util.UUID;
 +import java.util.concurrent.CompletableFuture;
 +import java.util.concurrent.Semaphore;
  import java.util.concurrent.atomic.AtomicInteger;
- import java.util.stream.Stream;
  import org.apache.commons.io.FileUtils;
- import org.apache.commons.io.IOUtils;
- import org.apache.commons.lang3.RandomStringUtils;
  import org.apache.hadoop.conf.StorageUnit;
  import org.apache.hadoop.fs.FileUtil;
- import org.apache.hadoop.hdds.client.BlockID;
- import org.apache.hadoop.hdds.conf.ConfigurationSource;
  import org.apache.hadoop.hdds.conf.OzoneConfiguration;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
@@@ -92,17 -67,9 +70,10 @@@ import org.apache.hadoop.hdds.protocol.
  import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
  import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
  import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 +import org.apache.hadoop.hdds.scm.ScmConfigKeys;
- import org.apache.hadoop.hdds.scm.XceiverClientSpi;
  import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
- import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
- import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
  import org.apache.hadoop.hdds.security.token.TokenVerifier;
- import org.apache.hadoop.hdds.utils.db.BatchOperation;
- import org.apache.hadoop.ozone.OzoneConsts;
- import org.apache.hadoop.ozone.common.Checksum;
- import org.apache.hadoop.ozone.common.ChecksumData;
  import 
org.apache.hadoop.ozone.container.checksum.ContainerChecksumTreeManager;
  import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
  import org.apache.hadoop.ozone.container.common.ContainerTestUtils;
@@@ -121,27 -85,19 +89,22 @@@ import org.apache.hadoop.ozone.containe
  import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
  import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
  import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
- import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
  import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
  import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
- import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
+ import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+ import 
org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
+ import 
org.apache.hadoop.ozone.container.ozoneimpl.OnDemandContainerDataScanner;
  import org.apache.hadoop.util.Sets;
 +import org.apache.hadoop.util.Time;
  import org.apache.ozone.test.GenericTestUtils;
 +import org.apache.ozone.test.GenericTestUtils.LogCapturer;
- import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
  import org.junit.jupiter.api.Assertions;
  import org.junit.jupiter.api.BeforeEach;
  import org.junit.jupiter.api.Test;
 -import org.junit.jupiter.api.Timeout;
  import org.junit.jupiter.api.io.TempDir;
- import org.junit.jupiter.params.ParameterizedTest;
- import org.junit.jupiter.params.provider.Arguments;
- import org.junit.jupiter.params.provider.MethodSource;
- import org.mockito.MockedStatic;
  import org.mockito.Mockito;
- import org.mockito.invocation.InvocationOnMock;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
  /**
   * Unit tests for {@link KeyValueHandler}.
@@@ -165,28 -118,8 +126,9 @@@ public class TestKeyValueHandler 
    private HddsDispatcher dispatcher;
    private KeyValueHandler handler;
    private OzoneConfiguration conf;
+   private ContainerSet mockContainerSet;
 +  private long maxContainerSize;
  
-   /**
-    * Number of corrupt blocks and chunks.
-    */
-   public static Stream<Arguments> corruptionValues() {
-     return Stream.of(
-         Arguments.of(5, 0),
-         Arguments.of(0, 5),
-         Arguments.of(0, 10),
-         Arguments.of(10, 0),
-         Arguments.of(5, 10),
-         Arguments.of(10, 5),
-         Arguments.of(2, 3),
-         Arguments.of(3, 2),
-         Arguments.of(4, 6),
-         Arguments.of(6, 4),
-         Arguments.of(6, 9),
-         Arguments.of(9, 6)
-     );
-   }
- 
    @BeforeEach
    public void setup() throws IOException {
      // Create mock HddsDispatcher and KeyValueHandler.
@@@ -441,68 -389,6 +385,68 @@@
          "Close container should return Invalid container error");
    }
  
 +  @Test
 +  public void testCreateContainerWithFailure() throws Exception {
 +    final String testDir = tempDir.toString();
 +    final long containerID = 1L;
 +    final String clusterId = UUID.randomUUID().toString();
 +    final String datanodeId = UUID.randomUUID().toString();
-     final ConfigurationSource conf = new OzoneConfiguration();
++    conf = new OzoneConfiguration();
 +    final ContainerSet containerSet = spy(newContainerSet());
 +    final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
 +
 +    HddsVolume hddsVolume = new HddsVolume.Builder(testDir).conf(conf)
 +        .clusterID(clusterId).datanodeUuid(datanodeId)
 +        .volumeSet(volumeSet)
 +        .build();
 +
 +    hddsVolume.format(clusterId);
 +    hddsVolume.createWorkingDir(clusterId, null);
 +    hddsVolume.createTmpDirs(clusterId);
 +
 +    when(volumeSet.getVolumesList())
 +        .thenReturn(Collections.singletonList(hddsVolume));
 +
 +    List<HddsVolume> hddsVolumeList = StorageVolumeUtil
 +        .getHddsVolumesList(volumeSet.getVolumesList());
 +
 +    assertEquals(1, hddsVolumeList.size());
 +
 +    final ContainerMetrics metrics = ContainerMetrics.create(conf);
 +
 +    final AtomicInteger icrReceived = new AtomicInteger(0);
 +
 +    final KeyValueHandler kvHandler = new KeyValueHandler(conf,
 +        datanodeId, containerSet, volumeSet, metrics,
 +        c -> icrReceived.incrementAndGet(), new 
ContainerChecksumTreeManager(conf));
 +    kvHandler.setClusterID(clusterId);
 +
 +    final ContainerCommandRequestProto createContainer =
 +        createContainerRequest(datanodeId, containerID);
 +
 +    Semaphore semaphore = new Semaphore(1);
 +    doAnswer(invocation -> {
 +      semaphore.acquire();
 +      throw new 
StorageContainerException(ContainerProtos.Result.IO_EXCEPTION);
 +    }).when(containerSet).addContainer(any());
 +
 +    semaphore.acquire();
 +    CompletableFuture.runAsync(() ->
 +        kvHandler.handleCreateContainer(createContainer, null)
 +    );
 +
 +    // commit bytes has been allocated by volumeChoosingPolicy which is 
called in KeyValueContainer#create
 +    GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 
maxContainerSize,
 +            1000, 50000);
 +    semaphore.release();
 +
 +    LOG.info("Committed bytes: {}", hddsVolume.getCommittedBytes());
 +
 +    // release committed bytes as exception is thrown
 +    GenericTestUtils.waitFor(() -> hddsVolume.getCommittedBytes() == 0,
 +            1000, 50000);
 +  }
 +
    @Test
    public void testDeleteContainer() throws IOException {
      final String testDir = tempDir.toString();
@@@ -597,60 -484,6 +541,60 @@@
      }
    }
  
 +  /**
 +   * Tests that deleting a container decrements the cached used space of its 
volume.
 +   */
 +  @Test
 +  public void testDeleteDecrementsVolumeUsedSpace() throws IOException {
 +    final long containerID = 1;
 +    final String clusterId = UUID.randomUUID().toString();
 +    final String datanodeId = UUID.randomUUID().toString();
 +    final ContainerSet containerSet = newContainerSet();
 +    final MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);
 +    final HddsVolume hddsVolume = mock(HddsVolume.class);
 +    when(hddsVolume.getDeletedContainerDir()).thenReturn(new File(""));
 +
-     final ConfigurationSource conf = new OzoneConfiguration();
++    conf = new OzoneConfiguration();
 +    final ContainerMetrics metrics = ContainerMetrics.create(conf);
 +    final AtomicInteger icrReceived = new AtomicInteger(0);
 +    final long containerBytesUsed = 1024 * 1024;
 +
 +    // We're testing KeyValueHandler in this test, all the other objects are 
mocked
 +    final KeyValueHandler kvHandler = new KeyValueHandler(conf,
 +        datanodeId, containerSet, volumeSet, metrics,
 +        c -> icrReceived.incrementAndGet(), new 
ContainerChecksumTreeManager(conf));
 +    kvHandler.setClusterID(clusterId);
 +
 +    // Setup ContainerData and Container mocks
 +    KeyValueContainerData containerData = mock(KeyValueContainerData.class);
 +    when(containerData.getContainerID()).thenReturn(containerID);
 +    when(containerData.getVolume()).thenReturn(hddsVolume);
 +    when(containerData.getBytesUsed()).thenReturn(containerBytesUsed);
 +    
when(containerData.getState()).thenReturn(ContainerProtos.ContainerDataProto.State.CLOSED);
 +    when(containerData.isOpen()).thenReturn(false);
 +    
when(containerData.getLayoutVersion()).thenReturn(ContainerLayoutVersion.FILE_PER_BLOCK);
 +    when(containerData.getDbFile()).thenReturn(new File(tempDir.toFile(), 
"dummy.db"));
 +    when(containerData.getContainerPath()).thenReturn(tempDir.toString());
 +    when(containerData.getMetadataPath()).thenReturn(tempDir.toString());
 +
 +    KeyValueContainer container = mock(KeyValueContainer.class);
 +    when(container.getContainerData()).thenReturn(containerData);
 +    when(container.hasBlocks()).thenReturn(true);
 +
 +    containerSet.addContainer(container);
 +    assertNotNull(containerSet.getContainer(containerID));
 +
 +    // This is the method we're testing. It should decrement used space in 
the volume when deleting this container
 +    kvHandler.deleteContainer(container, true);
 +    assertNull(containerSet.getContainer(containerID));
 +
 +    // Verify ICR was sent (once for delete)
 +    assertEquals(1, icrReceived.get(), "ICR should be sent for delete");
 +    verify(container, times(1)).delete();
 +    // Verify decrementUsedSpace was called with the correct amount
 +    verify(hddsVolume, times(1)).decrementUsedSpace(eq(containerBytesUsed));
 +  }
 +
    @ContainerLayoutTestInfo.ContainerTest
    public void testContainerChecksumInvocation(ContainerLayoutVersion 
layoutVersion) throws Exception {
      conf = new OzoneConfiguration();
diff --cc 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
index de76af2555,6ea5ff0b12..4045b959d8
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOnDemandContainerDataScanner.java
@@@ -60,8 -60,9 +60,10 @@@ import org.mockito.stubbing.Answer
  @MockitoSettings(strictness = Strictness.LENIENT)
  public class TestOnDemandContainerDataScanner extends
      TestContainerScannersAbstract {
+   
+   private OnDemandContainerDataScanner onDemandScanner;
  
 +  @Override
    @BeforeEach
    public void setup() {
      super.setup();
diff --cc 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
index 67bf1ad194,0e2baeeeb3..6cba1e8f1c
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplica.java
@@@ -185,17 -177,19 +185,17 @@@ public final class ContainerReplica imp
  
    @Override
    public String toString() {
 -    return "ContainerReplica{" +
 -        "containerID=" + containerID +
 -        ", state=" + state +
 -        ", datanodeDetails=" + datanodeDetails +
 -        ", placeOfBirth=" + placeOfBirth +
 -        ", sequenceId=" + sequenceId +
 -        ", keyCount=" + keyCount +
 -        ", bytesUsed=" + bytesUsed + ((replicaIndex > 0) ?
 -        ",replicaIndex=" + replicaIndex :
 -        "") +
 -        ", isEmpty=" + isEmpty +
 -        ", dataChecksum=" + dataChecksum +
 -        '}';
 +    return "ContainerReplica{" + containerID
 +        + " (" + state
 +        + ") currentDN=" + datanodeDetails
 +        + (originDatanodeId != null ? ", originDN=" + originDatanodeId : " 
(origin)")
 +        + ", bcsid=" + sequenceId
 +        + (replicaIndex > 0 ? ", replicaIndex=" + replicaIndex : "")
 +        + ", keyCount=" + keyCount
 +        + ", bytesUsed=" + bytesUsed
 +        + ", " + (isEmpty ? "empty" : "non-empty")
-         + ", dataChecksum=" + dataChecksum +
++        + ", dataChecksum=" + dataChecksum
 +        + '}';
    }
  
    /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to