This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a7f06c1964 HDDS-3486. Recon cannot track missing containers that were 
created and went missing while it is down. (#3947)
a7f06c1964 is described below

commit a7f06c19642506828a5194d4306e2cc0de4d680a
Author: devmadhuu <[email protected]>
AuthorDate: Thu Dec 22 14:12:02 2022 +0530

    HDDS-3486. Recon cannot track missing containers that were created and went 
missing while it is down. (#3947)
---
 .../protocol/StorageContainerLocationProtocol.java |   7 +
 .../common/src/main/resources/ozone-default.xml    |  17 +++
 ...inerLocationProtocolClientSideTranslatorPB.java |  20 +++
 .../src/main/proto/ScmAdminProtocol.proto          |   1 +
 .../hdds/scm/container/ContainerManager.java       |  18 +++
 .../hdds/scm/container/ContainerManagerImpl.java   |  21 +++
 ...inerLocationProtocolServerSideTranslatorPB.java |  17 +++
 .../hdds/scm/server/SCMClientProtocolServer.java   |  14 ++
 hadoop-ozone/dev-support/intellij/ozone-site.xml   |  20 +++
 .../apache/hadoop/ozone/recon/TestReconTasks.java  |  36 ++++++
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |  12 ++
 .../ozone/recon/api/ClusterStateEndpoint.java      |   6 +-
 .../ozone/recon/fsck/ContainerHealthTask.java      |  18 ++-
 .../scm/ReconStorageContainerManagerFacade.java    | 141 ++++++++++++++++++++-
 .../recon/spi/StorageContainerServiceProvider.java |  21 +++
 .../impl/StorageContainerServiceProviderImpl.java  |  15 +++
 .../TestContainerHealthTaskRecordGenerator.java    |  42 ++++--
 17 files changed, 397 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 34bd2748f6..f06066b2aa 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -426,4 +426,11 @@ public interface StorageContainerLocationProtocol extends 
Closeable {
   Token<?> getContainerToken(ContainerID containerID) throws IOException;
 
   long getContainerCount() throws IOException;
+
+  long getContainerCount(HddsProtos.LifeCycleState state)
+      throws IOException;
+
+  List<ContainerInfo> getListOfContainers(
+      long startContainerID, int count, HddsProtos.LifeCycleState state)
+      throws IOException;
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6aa269c89d..35fb3d7c23 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3498,4 +3498,21 @@
       To enable/disable filesystem write via ratis streaming.
     </description>
   </property>
+
+  <property>
+    <name>ozone.recon.scm.snapshot.task.initial.delay</name>
+    <value>1m</value>
+    <tag>OZONE, MANAGEMENT, RECON</tag>
+    <description>
+      Initial delay in MINUTES by Recon to request SCM DB Snapshot.
+    </description>
+  </property>
+  <property>
+    <name>ozone.recon.scm.snapshot.task.interval.delay</name>
+    <value>24h</value>
+    <tag>OZONE, MANAGEMENT, RECON</tag>
+    <description>
+      Interval in MINUTES by Recon to request SCM DB Snapshot.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 488d970cf2..c973eb9809 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -1018,6 +1018,19 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
     return response.getContainerCount();
   }
 
+  @Override
+  public long getContainerCount(HddsProtos.LifeCycleState state)
+      throws IOException {
+    GetContainerCountRequestProto request =
+        GetContainerCountRequestProto.newBuilder().build();
+
+    GetContainerCountResponseProto response =
+        submitRequest(Type.GetClosedContainerCount,
+            builder -> builder.setGetContainerCountRequest(request))
+            .getGetContainerCountResponse();
+    return response.getContainerCount();
+  }
+
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;
@@ -1027,4 +1040,11 @@ public final class 
StorageContainerLocationProtocolClientSideTranslatorPB
   public void close() {
     RPC.stopProxy(rpcProxy);
   }
+
+  @Override
+  public List<ContainerInfo> getListOfContainers(
+      long startContainerID, int count, HddsProtos.LifeCycleState state)
+      throws IOException {
+    return listContainer(startContainerID, count, state);
+  }
 }
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto 
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index ccb5e2155e..da7c6e15bd 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -174,6 +174,7 @@ enum Type {
   GetContainerReplicas = 34;
   GetReplicationManagerReport = 35;
   ResetDeletedBlockRetryCount = 36;
+  GetClosedContainerCount = 37;
 }
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index fb171677f0..a093775067 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -80,6 +80,24 @@ public interface ContainerManager extends Closeable {
    */
   List<ContainerInfo> getContainers(LifeCycleState state);
 
+  /**
+   * Returns containers under certain conditions.
+   * Search container IDs from start ID(exclusive),
+   * The max size of the searching range cannot exceed the
+   * value of count.
+   *
+   * @param startID start containerID, >=0,
+   * start searching at the head if 0.
+   * @param count count must be >= 0
+   *              Usually the count will be replace with a very big
+   *              value instead of being unlimited in case the db is very big.
+   * @param state container state
+   *
+   * @return a list of container.
+   */
+  List<ContainerInfo> getContainers(ContainerID startID,
+                                    int count, LifeCycleState state);
+
   /**
    * Returns the size of containers which are in the specified state.
    *
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 9314d07de3..a3281efcf2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -178,6 +178,27 @@ public class ContainerManagerImpl implements 
ContainerManager {
     return containers;
   }
 
+  @Override
+  public List<ContainerInfo> getContainers(final ContainerID startID,
+                                           final int count,
+                                           final LifeCycleState state) {
+    scmContainerManagerMetrics.incNumListContainersOps();
+    final List<ContainerID> containersIds =
+        new ArrayList<>(containerStateManager.getContainerIDs(state));
+    Collections.sort(containersIds);
+    List<ContainerInfo> containers;
+    lock.lock();
+    try {
+      containers = containersIds.stream()
+          .filter(id -> id.compareTo(startID) >= 0).limit(count)
+          .map(containerStateManager::getContainer)
+          .collect(Collectors.toList());
+    } finally {
+      lock.unlock();
+    }
+    return containers;
+  }
+
   @Override
   public int getContainerStateCount(final LifeCycleState state) {
     return containerStateManager.getContainerIDs(state).size();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index e59c984174..617d17da1d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -640,6 +640,13 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
           .setGetContainerCountResponse(getContainerCount(
                   request.getGetContainerCountRequest()))
           .build();
+      case GetClosedContainerCount:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setGetContainerCountResponse(getClosedContainerCount(
+                request.getGetContainerCountRequest()))
+            .build();
       case GetContainerReplicas:
         return ScmContainerLocationResponse.newBuilder()
           .setCmdType(request.getCmdType())
@@ -1149,6 +1156,16 @@ public final class 
StorageContainerLocationProtocolServerSideTranslatorPB
       .build();
   }
 
+  public GetContainerCountResponseProto getClosedContainerCount(
+      StorageContainerLocationProtocolProtos.GetContainerCountRequestProto
+          request) throws IOException {
+
+    return GetContainerCountResponseProto.newBuilder()
+        .setContainerCount(impl.getContainerCount(
+            HddsProtos.LifeCycleState.CLOSED))
+        .build();
+  }
+
   public ResetDeletedBlockRetryCountResponseProto
       getResetDeletedBlockRetryCount(ResetDeletedBlockRetryCountRequestProto
       request) throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 21d179b59e..0a7eeb81e5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1123,6 +1123,20 @@ public class SCMClientProtocolServer implements
     return scm.getContainerManager().getContainers().size();
   }
 
+  @Override
+  public long getContainerCount(HddsProtos.LifeCycleState state)
+      throws IOException {
+    return scm.getContainerManager().getContainers(state).size();
+  }
+
+  @Override
+  public List<ContainerInfo> getListOfContainers(
+      long startContainerID, int count, HddsProtos.LifeCycleState state)
+      throws IOException {
+    return scm.getContainerManager().getContainers(
+        ContainerID.valueOf(startContainerID), count, state);
+  }
+
   /**
    * Queries a list of Node that match a set of statuses.
    *
diff --git a/hadoop-ozone/dev-support/intellij/ozone-site.xml 
b/hadoop-ozone/dev-support/intellij/ozone-site.xml
index 4eed6fd84e..2024fcf949 100644
--- a/hadoop-ozone/dev-support/intellij/ozone-site.xml
+++ b/hadoop-ozone/dev-support/intellij/ozone-site.xml
@@ -71,4 +71,24 @@
     <name>datanode.replication.port</name>
     <value>0</value>
   </property>
+  <property>
+    <name>ozone.scm.ratis.enable</name>
+    <value>false</value>
+  </property>
+  <property>
+    <name>hdds.container.report.interval</name>
+    <value>60m</value>
+  </property>
+  <property>
+    <name>hdds.recon.heartbeat.interval</name>
+    <value>60s</value>
+  </property>
+  <property>
+    <name>ozone.scm.stale.node.interval</name>
+    <value>5m</value>
+  </property>
+  <property>
+    <name>ozone.scm.dead.node.interval</name>
+    <value>10m</value>
+  </property>
 </configuration>
\ No newline at end of file
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
index abeb1f189c..18373d1789 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -89,6 +90,41 @@ public class TestReconTasks {
     }
   }
 
+  @Test
+  public void testSyncSCMContainerInfo() throws Exception {
+    ReconStorageContainerManagerFacade reconScm =
+        (ReconStorageContainerManagerFacade)
+            cluster.getReconServer().getReconStorageContainerManager();
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    ContainerManager scmContainerManager = scm.getContainerManager();
+    ContainerManager reconContainerManager = reconScm.getContainerManager();
+    final ContainerInfo container1 = scmContainerManager.allocateContainer(
+        RatisReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.ONE), "admin");
+    final ContainerInfo container2 = scmContainerManager.allocateContainer(
+        RatisReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.ONE), "admin");
+    reconContainerManager.allocateContainer(
+        RatisReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.ONE), "admin");
+    scmContainerManager.updateContainerState(container1.containerID(),
+        HddsProtos.LifeCycleEvent.FINALIZE);
+    scmContainerManager.updateContainerState(container2.containerID(),
+        HddsProtos.LifeCycleEvent.FINALIZE);
+    scmContainerManager.updateContainerState(container1.containerID(),
+        HddsProtos.LifeCycleEvent.CLOSE);
+    scmContainerManager.updateContainerState(container2.containerID(),
+        HddsProtos.LifeCycleEvent.CLOSE);
+    int scmContainersCount = scmContainerManager.getContainers().size();
+    int reconContainersCount = reconContainerManager
+        .getContainers().size();
+    Assert.assertNotEquals(scmContainersCount, reconContainersCount);
+    reconScm.syncWithSCMContainerInfo();
+    reconContainersCount = reconContainerManager
+        .getContainers().size();
+    Assert.assertEquals(scmContainersCount, reconContainersCount);
+  }
+
   @Test
   public void testMissingContainerDownNode() throws Exception {
     ReconStorageContainerManagerFacade reconScm =
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index c9e31563d3..2c97a0dfd3 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -149,6 +149,18 @@ public final class  ReconServerConfigKeys {
 
   public static final long
       OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 1000L;
+
+  public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY =
+      "ozone.recon.scm.snapshot.task.interval.delay";
+
+  public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT
+      = "24h";
+
+  public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY =
+      "ozone.recon.scm.snapshot.task.initial.delay";
+
+  public static final String
+      OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT = "1m";
   /**
    * Private constructor for utility class.
    */
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index 58a60296b0..ba3e28d429 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -91,13 +91,13 @@ public class ClusterStateEndpoint {
     List<DatanodeDetails> datanodeDetails = nodeManager.getAllNodes();
     int containers = this.containerManager.getContainers().size();
     int pipelines = this.pipelineManager.getPipelines().size();
-    List<UnhealthyContainers> unhealthyContainers = 
containerHealthSchemaManager
+    List<UnhealthyContainers> missingContainers = containerHealthSchemaManager
         .getUnhealthyContainers(
             ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
             0, MISSING_CONTAINER_COUNT_LIMIT);
-    int totalMissingContainerCount = unhealthyContainers.size() ==
+    int totalMissingContainerCount = missingContainers.size() ==
         MISSING_CONTAINER_COUNT_LIMIT ?
-        MISSING_CONTAINER_COUNT_LIMIT : unhealthyContainers.size();
+        MISSING_CONTAINER_COUNT_LIMIT : missingContainers.size();
     int openContainersCount = this.containerManager.getContainerStateCount(
         HddsProtos.LifeCycleState.OPEN);
     int healthyDatanodes =
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 59ca2ffd1a..606eb0a062 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -49,6 +49,7 @@ import org.jooq.Cursor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Class that scans the list of containers and keeps track of containers with
  * no replicas in a SQL table.
@@ -65,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask {
   private ContainerHealthSchemaManager containerHealthSchemaManager;
   private PlacementPolicy placementPolicy;
   private final long interval;
+
   private Set<ContainerInfo> processedContainers = new HashSet<>();
 
   public ContainerHealthTask(
@@ -172,7 +174,8 @@ public class ContainerHealthTask extends ReconScmTask {
             currentContainer = setCurrentContainer(rec.getContainerId());
           }
           if (ContainerHealthRecords
-              .retainOrUpdateRecord(currentContainer, rec)) {
+              .retainOrUpdateRecord(currentContainer, rec
+              )) {
             // Check if the missing container is deleted in SCM
             if (currentContainer.isMissing() &&
                 containerDeletedInSCM(currentContainer.getContainer())) {
@@ -265,10 +268,13 @@ public class ContainerHealthTask extends ReconScmTask {
      * If the record is to be retained, the fields in the record for actual
      * replica count, delta and reason will be updated if their counts have
      * changed.
-     * @param container ContainerHealthStatus representing the health state of
-     *                  the container.
-     * @param rec Existing database record from the UnhealthyContainers table.
-     * @return
+     *
+     * @param container ContainerHealthStatus representing the
+     *                  health state of the container.
+     * @param rec       Existing database record from the
+     *                  UnhealthyContainers table.
+     * @return returns true or false if need to retain or update the unhealthy
+     * container record
      */
     public static boolean retainOrUpdateRecord(
         ContainerHealthStatus container, UnhealthyContainersRecord rec) {
@@ -342,6 +348,7 @@ public class ContainerHealthTask extends ReconScmTask {
         records.add(recordForState(
             container, UnHealthyContainerStates.MIS_REPLICATED, time));
       }
+
       return records;
     }
 
@@ -434,5 +441,4 @@ public class ContainerHealthTask extends ReconScmTask {
       }
     }
   }
-
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index b93e04cbdd..3276b99ac5 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -29,20 +29,28 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmUtils;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
@@ -93,6 +101,11 @@ import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EX
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
+import static 
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY;
+
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReport;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
@@ -112,6 +125,7 @@ public class ReconStorageContainerManagerFacade
 
   private static final Logger LOG = LoggerFactory
       .getLogger(ReconStorageContainerManagerFacade.class);
+  public static final long CONTAINER_METADATA_SIZE = 1 * 1024 * 1024L;
 
   private final OzoneConfiguration ozoneConfiguration;
   private final ReconDatanodeProtocolServer datanodeProtocolServer;
@@ -133,6 +147,10 @@ public class ReconStorageContainerManagerFacade
   private PlacementPolicy containerPlacementPolicy;
   private HDDSLayoutVersionManager scmLayoutVersionManager;
 
+  private ScheduledExecutorService scheduler;
+
+  private AtomicBoolean isSyncDataFromSCMRunning;
+
   @Inject
   public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
       StorageContainerServiceProvider scmServiceProvider,
@@ -185,6 +203,7 @@ public class ReconStorageContainerManagerFacade
         containerHealthSchemaManager, reconContainerMetadataManager,
         scmhaManager, sequenceIdGen, pendingOps);
     this.scmServiceProvider = scmServiceProvider;
+    this.isSyncDataFromSCMRunning = new AtomicBoolean();
 
     NodeReportHandler nodeReportHandler =
         new NodeReportHandler(nodeManager);
@@ -326,6 +345,7 @@ public class ReconStorageContainerManagerFacade
           "Recon ScmDatanodeProtocol RPC server",
           getDatanodeProtocolServer().getDatanodeRpcAddress()));
     }
+    scheduler = Executors.newScheduledThreadPool(1);
     boolean isSCMSnapshotEnabled = ozoneConfiguration.getBoolean(
         ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED,
         ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED_DEFAULT);
@@ -335,6 +355,35 @@ public class ReconStorageContainerManagerFacade
     } else {
       initializePipelinesFromScm();
     }
+    LOG.debug("Started the SCM Container Info sync scheduler.");
+    long interval = ozoneConfiguration.getTimeDuration(
+        OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY,
+        OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+    long initialDelay = ozoneConfiguration.getTimeDuration(
+        OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY,
+        OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    // This periodic sync with SCM container cache is needed because during
+    // the window when recon will be down and any container being added
+    // newly and went missing, that container will not be reported as missing 
by
+    // recon till there is a difference of container count equivalent to
+    // threshold value defined in "ozone.recon.scm.container.threshold"
+    // between SCM container cache and recon container cache.
+    scheduler.scheduleWithFixedDelay(() -> {
+      try {
+        boolean isSuccess = syncWithSCMContainerInfo();
+        if (!isSuccess) {
+          LOG.debug("SCM container info sync is already running.");
+        }
+      } catch (Throwable t) {
+        LOG.error("Unexpected exception while syncing data from SCM.", t);
+      } finally {
+        isSyncDataFromSCMRunning.compareAndSet(true, false);
+      }
+    },
+        initialDelay,
+        interval,
+        TimeUnit.MILLISECONDS);
     getDatanodeProtocolServer().start();
     this.reconScmTasks.forEach(ReconScmTask::start);
   }
@@ -416,22 +465,102 @@ public class ReconStorageContainerManagerFacade
       }
     } catch (IOException e) {
       LOG.error("Exception encountered while getting SCM DB.");
+    } finally {
+      isSyncDataFromSCMRunning.compareAndSet(true, false);
     }
   }
 
   public void updateReconSCMDBWithNewSnapshot() throws IOException {
-    DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
-    if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
-      LOG.info("Got new checkpoint from SCM : " +
-          dbSnapshot.getCheckpointLocation());
+    if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
+      DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+      if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+        LOG.info("Got new checkpoint from SCM : " +
+            dbSnapshot.getCheckpointLocation());
+        try {
+          initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+        } catch (IOException e) {
+          LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+        }
+      } else {
+        LOG.error("Null snapshot location got from SCM.");
+      }
+    } else {
+      LOG.warn("SCM DB sync is already running.");
+    }
+  }
+
+  public boolean syncWithSCMContainerInfo()
+      throws IOException {
+    if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
       try {
-        initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+        List<ContainerInfo> containers = containerManager.getContainers();
+
+        long totalContainerCount = scmServiceProvider.getContainerCount(
+            HddsProtos.LifeCycleState.CLOSED);
+        long containerCountPerCall =
+            getContainerCountPerCall(totalContainerCount);
+        long startContainerId = 1;
+        long retrievedContainerCount = 0;
+        if (totalContainerCount > 0) {
+          while (retrievedContainerCount < totalContainerCount) {
+            List<ContainerInfo> listOfContainers = scmServiceProvider.
+                getListOfContainers(startContainerId,
+                    Long.valueOf(containerCountPerCall).intValue(),
+                    HddsProtos.LifeCycleState.CLOSED);
+            if (null != listOfContainers && listOfContainers.size() > 0) {
+              LOG.info("Got list of containers from SCM : " +
+                  listOfContainers.size());
+              listOfContainers.forEach(containerInfo -> {
+                long containerID = containerInfo.getContainerID();
+                boolean isContainerPresentAtRecon =
+                    containers.contains(containerInfo);
+                if (!isContainerPresentAtRecon) {
+                  try {
+                    ContainerWithPipeline containerWithPipeline =
+                        scmServiceProvider.getContainerWithPipeline(
+                            containerID);
+                    containerManager.addNewContainer(containerWithPipeline);
+                  } catch (IOException e) {
+                    LOG.error("Could not get container with pipeline " +
+                        "for container : {}", containerID);
+                  } catch (TimeoutException e) {
+                    LOG.error("Could not add new container {} in Recon " +
+                        "container manager cache.", containerID);
+                  }
+                }
+              });
+              startContainerId = listOfContainers.get(
+                  listOfContainers.size() - 1).getContainerID() + 1;
+            } else {
+              LOG.info("No containers found at SCM in CLOSED state");
+              return false;
+            }
+            retrievedContainerCount += containerCountPerCall;
+          }
+        }
       } catch (IOException e) {
         LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+        return false;
       }
     } else {
-      LOG.error("Null snapshot location got from SCM.");
+      LOG.debug("SCM DB sync is already running.");
+      return false;
     }
+    return true;
+  }
+
+  private long getContainerCountPerCall(long totalContainerCount) {
+    // Assumption of size of 1 container info object here is 1 MB
+    long containersMetaDataTotalRpcRespSizeMB =
+        CONTAINER_METADATA_SIZE * totalContainerCount;
+    long hadoopRPCSize = ozoneConfiguration.getInt(
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+        CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+    long containerCountPerCall = containersMetaDataTotalRpcRespSizeMB <=
+        hadoopRPCSize ? totalContainerCount :
+        Math.round(Math.floor(
+            hadoopRPCSize / (double) CONTAINER_METADATA_SIZE));
+    return containerCountPerCall;
   }
 
   private void deleteOldSCMDB() throws IOException {
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
index cf57937a15..2ee3f6bbd7 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -78,4 +79,24 @@ public interface StorageContainerServiceProvider {
    * @return DBCheckpoint from SCM.
    */
   DBCheckpoint getSCMDBSnapshot();
+
+  /**
+   * Get the list of containers from SCM. This is a RPC call.
+   *
+   * @param startContainerID the start container id
+   * @param count the number of containers to return
+   * @param state the containers in given state to be returned
+   * @return the list of containers from SCM in a given state
+   * @throws IOException
+   */
+  List<ContainerInfo> getListOfContainers(long startContainerID,
+                                          int count,
+                                          HddsProtos.LifeCycleState state)
+      throws IOException;
+
+  /**
+   * Requests SCM for container count for a given state.
+   * @return Total number of containers in SCM.
+   */
+  long getContainerCount(HddsProtos.LifeCycleState state) throws IOException;
 }
diff --git 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
index 1550a89cd4..d4ceaec89f 100644
--- 
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
+++ 
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
@@ -41,6 +41,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.ha.InterSCMGrpcClient;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
@@ -155,6 +156,12 @@ public class StorageContainerServiceProviderImpl
     return scmClient.getContainerCount();
   }
 
+  @Override
+  public long getContainerCount(HddsProtos.LifeCycleState state)
+      throws IOException {
+    return scmClient.getContainerCount(state);
+  }
+
   public String getScmDBSnapshotUrl() {
     return scmDBSnapshotUrl;
   }
@@ -215,4 +222,12 @@ public class StorageContainerServiceProviderImpl
     }
     return null;
   }
+
+  @Override
+  public List<ContainerInfo> getListOfContainers(
+      long startContainerID, int count, HddsProtos.LifeCycleState state)
+      throws IOException {
+    return scmClient.getListOfContainers(startContainerID, count, state);
+  }
+
 }
diff --git 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
index c88c2440f9..4e86ca9056 100644
--- 
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
+++ 
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
@@ -76,20 +76,25 @@ public class TestContainerHealthTaskRecordGenerator {
         new ContainerHealthStatus(container, replicas, placementPolicy);
     // Missing record should be retained
     assertTrue(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, missingRecord()));
+        .retainOrUpdateRecord(status, missingRecord()
+        ));
     // Under / Over / Mis replicated should not be retained as if a container 
is
     // missing then it is not in any other category.
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, underReplicatedRecord()));
+        .retainOrUpdateRecord(status, underReplicatedRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, overReplicatedRecord()));
+        .retainOrUpdateRecord(status, overReplicatedRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, misReplicatedRecord()));
+        .retainOrUpdateRecord(status, misReplicatedRecord()
+        ));
 
     replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
     status = new ContainerHealthStatus(container, replicas, placementPolicy);
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, missingRecord()));
+        .retainOrUpdateRecord(status, missingRecord()
+        ));
   }
 
   @Test
@@ -109,11 +114,14 @@ public class TestContainerHealthTaskRecordGenerator {
 
     // Missing / Over / Mis replicated should not be retained
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, missingRecord()));
+        .retainOrUpdateRecord(status, missingRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, overReplicatedRecord()));
+        .retainOrUpdateRecord(status, overReplicatedRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, misReplicatedRecord()));
+        .retainOrUpdateRecord(status, misReplicatedRecord()
+        ));
 
     // Container is now replicated OK - should be removed.
     replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
@@ -139,11 +147,14 @@ public class TestContainerHealthTaskRecordGenerator {
 
     // Missing / Over / Mis replicated should not be retained
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, missingRecord()));
+        .retainOrUpdateRecord(status, missingRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, underReplicatedRecord()));
+        .retainOrUpdateRecord(status, underReplicatedRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, misReplicatedRecord()));
+        .retainOrUpdateRecord(status, misReplicatedRecord()
+        ));
 
     // Container is now replicated OK - should be removed.
     replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
@@ -173,11 +184,14 @@ public class TestContainerHealthTaskRecordGenerator {
 
     // Missing / Over / Mis replicated should not be retained
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, missingRecord()));
+        .retainOrUpdateRecord(status, missingRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, underReplicatedRecord()));
+        .retainOrUpdateRecord(status, underReplicatedRecord()
+        ));
     assertFalse(ContainerHealthTask.ContainerHealthRecords
-        .retainOrUpdateRecord(status, overReplicatedRecord()));
+        .retainOrUpdateRecord(status, overReplicatedRecord()
+        ));
 
     // Container is now placed OK - should be removed.
     when(placementPolicy.validateContainerPlacement(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to