This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a7f06c1964 HDDS-3486. Recon cannot track missing containers that were
created and went missing while it is down. (#3947)
a7f06c1964 is described below
commit a7f06c19642506828a5194d4306e2cc0de4d680a
Author: devmadhuu <[email protected]>
AuthorDate: Thu Dec 22 14:12:02 2022 +0530
HDDS-3486. Recon cannot track missing containers that were created and went
missing while it is down. (#3947)
---
.../protocol/StorageContainerLocationProtocol.java | 7 +
.../common/src/main/resources/ozone-default.xml | 17 +++
...inerLocationProtocolClientSideTranslatorPB.java | 20 +++
.../src/main/proto/ScmAdminProtocol.proto | 1 +
.../hdds/scm/container/ContainerManager.java | 18 +++
.../hdds/scm/container/ContainerManagerImpl.java | 21 +++
...inerLocationProtocolServerSideTranslatorPB.java | 17 +++
.../hdds/scm/server/SCMClientProtocolServer.java | 14 ++
hadoop-ozone/dev-support/intellij/ozone-site.xml | 20 +++
.../apache/hadoop/ozone/recon/TestReconTasks.java | 36 ++++++
.../hadoop/ozone/recon/ReconServerConfigKeys.java | 12 ++
.../ozone/recon/api/ClusterStateEndpoint.java | 6 +-
.../ozone/recon/fsck/ContainerHealthTask.java | 18 ++-
.../scm/ReconStorageContainerManagerFacade.java | 141 ++++++++++++++++++++-
.../recon/spi/StorageContainerServiceProvider.java | 21 +++
.../impl/StorageContainerServiceProviderImpl.java | 15 +++
.../TestContainerHealthTaskRecordGenerator.java | 42 ++++--
17 files changed, 397 insertions(+), 29 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
index 34bd2748f6..f06066b2aa 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java
@@ -426,4 +426,11 @@ public interface StorageContainerLocationProtocol extends
Closeable {
Token<?> getContainerToken(ContainerID containerID) throws IOException;
long getContainerCount() throws IOException;
+
+ long getContainerCount(HddsProtos.LifeCycleState state)
+ throws IOException;
+
+ List<ContainerInfo> getListOfContainers(
+ long startContainerID, int count, HddsProtos.LifeCycleState state)
+ throws IOException;
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 6aa269c89d..35fb3d7c23 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3498,4 +3498,21 @@
To enable/disable filesystem write via ratis streaming.
</description>
</property>
+
+ <property>
+ <name>ozone.recon.scm.snapshot.task.initial.delay</name>
+ <value>1m</value>
+ <tag>OZONE, MANAGEMENT, RECON</tag>
+ <description>
+ Initial delay in MINUTES by Recon to request SCM DB Snapshot.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.scm.snapshot.task.interval.delay</name>
+ <value>24h</value>
+ <tag>OZONE, MANAGEMENT, RECON</tag>
+ <description>
+ Interval in MINUTES by Recon to request SCM DB Snapshot.
+ </description>
+ </property>
</configuration>
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 488d970cf2..c973eb9809 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -1018,6 +1018,19 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
return response.getContainerCount();
}
+ @Override
+ public long getContainerCount(HddsProtos.LifeCycleState state)
+ throws IOException {
+ GetContainerCountRequestProto request =
+ GetContainerCountRequestProto.newBuilder().build();
+
+ GetContainerCountResponseProto response =
+ submitRequest(Type.GetClosedContainerCount,
+ builder -> builder.setGetContainerCountRequest(request))
+ .getGetContainerCountResponse();
+ return response.getContainerCount();
+ }
+
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
@@ -1027,4 +1040,11 @@ public final class
StorageContainerLocationProtocolClientSideTranslatorPB
public void close() {
RPC.stopProxy(rpcProxy);
}
+
+ @Override
+ public List<ContainerInfo> getListOfContainers(
+ long startContainerID, int count, HddsProtos.LifeCycleState state)
+ throws IOException {
+ return listContainer(startContainerID, count, state);
+ }
}
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index ccb5e2155e..da7c6e15bd 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -174,6 +174,7 @@ enum Type {
GetContainerReplicas = 34;
GetReplicationManagerReport = 35;
ResetDeletedBlockRetryCount = 36;
+ GetClosedContainerCount = 37;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index fb171677f0..a093775067 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -80,6 +80,24 @@ public interface ContainerManager extends Closeable {
*/
List<ContainerInfo> getContainers(LifeCycleState state);
+ /**
+ * Returns containers under certain conditions.
+ * Search container IDs from start ID(exclusive),
+ * The max size of the searching range cannot exceed the
+ * value of count.
+ *
+ * @param startID start containerID, >=0,
+ * start searching at the head if 0.
+ * @param count count must be >= 0
+ * Usually the count will be replace with a very big
+ * value instead of being unlimited in case the db is very big.
+ * @param state container state
+ *
+ * @return a list of container.
+ */
+ List<ContainerInfo> getContainers(ContainerID startID,
+ int count, LifeCycleState state);
+
/**
* Returns the size of containers which are in the specified state.
*
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index 9314d07de3..a3281efcf2 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -178,6 +178,27 @@ public class ContainerManagerImpl implements
ContainerManager {
return containers;
}
+ @Override
+ public List<ContainerInfo> getContainers(final ContainerID startID,
+ final int count,
+ final LifeCycleState state) {
+ scmContainerManagerMetrics.incNumListContainersOps();
+ final List<ContainerID> containersIds =
+ new ArrayList<>(containerStateManager.getContainerIDs(state));
+ Collections.sort(containersIds);
+ List<ContainerInfo> containers;
+ lock.lock();
+ try {
+ containers = containersIds.stream()
+ .filter(id -> id.compareTo(startID) >= 0).limit(count)
+ .map(containerStateManager::getContainer)
+ .collect(Collectors.toList());
+ } finally {
+ lock.unlock();
+ }
+ return containers;
+ }
+
@Override
public int getContainerStateCount(final LifeCycleState state) {
return containerStateManager.getContainerIDs(state).size();
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index e59c984174..617d17da1d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -640,6 +640,13 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.setGetContainerCountResponse(getContainerCount(
request.getGetContainerCountRequest()))
.build();
+ case GetClosedContainerCount:
+ return ScmContainerLocationResponse.newBuilder()
+ .setCmdType(request.getCmdType())
+ .setStatus(Status.OK)
+ .setGetContainerCountResponse(getClosedContainerCount(
+ request.getGetContainerCountRequest()))
+ .build();
case GetContainerReplicas:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
@@ -1149,6 +1156,16 @@ public final class
StorageContainerLocationProtocolServerSideTranslatorPB
.build();
}
+ public GetContainerCountResponseProto getClosedContainerCount(
+ StorageContainerLocationProtocolProtos.GetContainerCountRequestProto
+ request) throws IOException {
+
+ return GetContainerCountResponseProto.newBuilder()
+ .setContainerCount(impl.getContainerCount(
+ HddsProtos.LifeCycleState.CLOSED))
+ .build();
+ }
+
public ResetDeletedBlockRetryCountResponseProto
getResetDeletedBlockRetryCount(ResetDeletedBlockRetryCountRequestProto
request) throws IOException {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 21d179b59e..0a7eeb81e5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -1123,6 +1123,20 @@ public class SCMClientProtocolServer implements
return scm.getContainerManager().getContainers().size();
}
+ @Override
+ public long getContainerCount(HddsProtos.LifeCycleState state)
+ throws IOException {
+ return scm.getContainerManager().getContainers(state).size();
+ }
+
+ @Override
+ public List<ContainerInfo> getListOfContainers(
+ long startContainerID, int count, HddsProtos.LifeCycleState state)
+ throws IOException {
+ return scm.getContainerManager().getContainers(
+ ContainerID.valueOf(startContainerID), count, state);
+ }
+
/**
* Queries a list of Node that match a set of statuses.
*
diff --git a/hadoop-ozone/dev-support/intellij/ozone-site.xml
b/hadoop-ozone/dev-support/intellij/ozone-site.xml
index 4eed6fd84e..2024fcf949 100644
--- a/hadoop-ozone/dev-support/intellij/ozone-site.xml
+++ b/hadoop-ozone/dev-support/intellij/ozone-site.xml
@@ -71,4 +71,24 @@
<name>datanode.replication.port</name>
<value>0</value>
</property>
+ <property>
+ <name>ozone.scm.ratis.enable</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>hdds.container.report.interval</name>
+ <value>60m</value>
+ </property>
+ <property>
+ <name>hdds.recon.heartbeat.interval</name>
+ <value>60s</value>
+ </property>
+ <property>
+ <name>ozone.scm.stale.node.interval</name>
+ <value>5m</value>
+ </property>
+ <property>
+ <name>ozone.scm.dead.node.interval</name>
+ <value>10m</value>
+ </property>
</configuration>
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
index abeb1f189c..18373d1789 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasks.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -89,6 +90,41 @@ public class TestReconTasks {
}
}
+ @Test
+ public void testSyncSCMContainerInfo() throws Exception {
+ ReconStorageContainerManagerFacade reconScm =
+ (ReconStorageContainerManagerFacade)
+ cluster.getReconServer().getReconStorageContainerManager();
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ ContainerManager scmContainerManager = scm.getContainerManager();
+ ContainerManager reconContainerManager = reconScm.getContainerManager();
+ final ContainerInfo container1 = scmContainerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE), "admin");
+ final ContainerInfo container2 = scmContainerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE), "admin");
+ reconContainerManager.allocateContainer(
+ RatisReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE), "admin");
+ scmContainerManager.updateContainerState(container1.containerID(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ scmContainerManager.updateContainerState(container2.containerID(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ scmContainerManager.updateContainerState(container1.containerID(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+ scmContainerManager.updateContainerState(container2.containerID(),
+ HddsProtos.LifeCycleEvent.CLOSE);
+ int scmContainersCount = scmContainerManager.getContainers().size();
+ int reconContainersCount = reconContainerManager
+ .getContainers().size();
+ Assert.assertNotEquals(scmContainersCount, reconContainersCount);
+ reconScm.syncWithSCMContainerInfo();
+ reconContainersCount = reconContainerManager
+ .getContainers().size();
+ Assert.assertEquals(scmContainersCount, reconContainersCount);
+ }
+
@Test
public void testMissingContainerDownNode() throws Exception {
ReconStorageContainerManagerFacade reconScm =
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index c9e31563d3..2c97a0dfd3 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -149,6 +149,18 @@ public final class ReconServerConfigKeys {
public static final long
OZONE_RECON_NSSUMMARY_FLUSH_TO_DB_MAX_THRESHOLD_DEFAULT = 150 * 1000L;
+
+ public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY =
+ "ozone.recon.scm.snapshot.task.interval.delay";
+
+ public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT
+ = "24h";
+
+ public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY =
+ "ozone.recon.scm.snapshot.task.initial.delay";
+
+ public static final String
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT = "1m";
/**
* Private constructor for utility class.
*/
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
index 58a60296b0..ba3e28d429 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ClusterStateEndpoint.java
@@ -91,13 +91,13 @@ public class ClusterStateEndpoint {
List<DatanodeDetails> datanodeDetails = nodeManager.getAllNodes();
int containers = this.containerManager.getContainers().size();
int pipelines = this.pipelineManager.getPipelines().size();
- List<UnhealthyContainers> unhealthyContainers =
containerHealthSchemaManager
+ List<UnhealthyContainers> missingContainers = containerHealthSchemaManager
.getUnhealthyContainers(
ContainerSchemaDefinition.UnHealthyContainerStates.MISSING,
0, MISSING_CONTAINER_COUNT_LIMIT);
- int totalMissingContainerCount = unhealthyContainers.size() ==
+ int totalMissingContainerCount = missingContainers.size() ==
MISSING_CONTAINER_COUNT_LIMIT ?
- MISSING_CONTAINER_COUNT_LIMIT : unhealthyContainers.size();
+ MISSING_CONTAINER_COUNT_LIMIT : missingContainers.size();
int openContainersCount = this.containerManager.getContainerStateCount(
HddsProtos.LifeCycleState.OPEN);
int healthyDatanodes =
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
index 59ca2ffd1a..606eb0a062 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java
@@ -49,6 +49,7 @@ import org.jooq.Cursor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Class that scans the list of containers and keeps track of containers with
* no replicas in a SQL table.
@@ -65,6 +66,7 @@ public class ContainerHealthTask extends ReconScmTask {
private ContainerHealthSchemaManager containerHealthSchemaManager;
private PlacementPolicy placementPolicy;
private final long interval;
+
private Set<ContainerInfo> processedContainers = new HashSet<>();
public ContainerHealthTask(
@@ -172,7 +174,8 @@ public class ContainerHealthTask extends ReconScmTask {
currentContainer = setCurrentContainer(rec.getContainerId());
}
if (ContainerHealthRecords
- .retainOrUpdateRecord(currentContainer, rec)) {
+ .retainOrUpdateRecord(currentContainer, rec
+ )) {
// Check if the missing container is deleted in SCM
if (currentContainer.isMissing() &&
containerDeletedInSCM(currentContainer.getContainer())) {
@@ -265,10 +268,13 @@ public class ContainerHealthTask extends ReconScmTask {
* If the record is to be retained, the fields in the record for actual
* replica count, delta and reason will be updated if their counts have
* changed.
- * @param container ContainerHealthStatus representing the health state of
- * the container.
- * @param rec Existing database record from the UnhealthyContainers table.
- * @return
+ *
+ * @param container ContainerHealthStatus representing the
+ * health state of the container.
+ * @param rec Existing database record from the
+ * UnhealthyContainers table.
+ * @return returns true or false if need to retain or update the unhealthy
+ * container record
*/
public static boolean retainOrUpdateRecord(
ContainerHealthStatus container, UnhealthyContainersRecord rec) {
@@ -342,6 +348,7 @@ public class ContainerHealthTask extends ReconScmTask {
records.add(recordForState(
container, UnHealthyContainerStates.MIS_REPLICATED, time));
}
+
return records;
}
@@ -434,5 +441,4 @@ public class ContainerHealthTask extends ReconScmTask {
}
}
}
-
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index b93e04cbdd..3276b99ac5 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -29,20 +29,28 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.balancer.ContainerBalancer;
@@ -93,6 +101,11 @@ import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_EX
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_EVENT_REPORT_QUEUE_WAIT_THRESHOLD_DEFAULT;
import static
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY;
+
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReport;
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
@@ -112,6 +125,7 @@ public class ReconStorageContainerManagerFacade
private static final Logger LOG = LoggerFactory
.getLogger(ReconStorageContainerManagerFacade.class);
+ public static final long CONTAINER_METADATA_SIZE = 1 * 1024 * 1024L;
private final OzoneConfiguration ozoneConfiguration;
private final ReconDatanodeProtocolServer datanodeProtocolServer;
@@ -133,6 +147,10 @@ public class ReconStorageContainerManagerFacade
private PlacementPolicy containerPlacementPolicy;
private HDDSLayoutVersionManager scmLayoutVersionManager;
+ private ScheduledExecutorService scheduler;
+
+ private AtomicBoolean isSyncDataFromSCMRunning;
+
@Inject
public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
StorageContainerServiceProvider scmServiceProvider,
@@ -185,6 +203,7 @@ public class ReconStorageContainerManagerFacade
containerHealthSchemaManager, reconContainerMetadataManager,
scmhaManager, sequenceIdGen, pendingOps);
this.scmServiceProvider = scmServiceProvider;
+ this.isSyncDataFromSCMRunning = new AtomicBoolean();
NodeReportHandler nodeReportHandler =
new NodeReportHandler(nodeManager);
@@ -326,6 +345,7 @@ public class ReconStorageContainerManagerFacade
"Recon ScmDatanodeProtocol RPC server",
getDatanodeProtocolServer().getDatanodeRpcAddress()));
}
+ scheduler = Executors.newScheduledThreadPool(1);
boolean isSCMSnapshotEnabled = ozoneConfiguration.getBoolean(
ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED,
ReconServerConfigKeys.OZONE_RECON_SCM_SNAPSHOT_ENABLED_DEFAULT);
@@ -335,6 +355,35 @@ public class ReconStorageContainerManagerFacade
} else {
initializePipelinesFromScm();
}
+ LOG.debug("Started the SCM Container Info sync scheduler.");
+ long interval = ozoneConfiguration.getTimeDuration(
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY,
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+ long initialDelay = ozoneConfiguration.getTimeDuration(
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY,
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ // This periodic sync with SCM container cache is needed because during
+ // the window when recon will be down and any container being added
+ // newly and went missing, that container will not be reported as missing
by
+ // recon till there is a difference of container count equivalent to
+ // threshold value defined in "ozone.recon.scm.container.threshold"
+ // between SCM container cache and recon container cache.
+ scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ boolean isSuccess = syncWithSCMContainerInfo();
+ if (!isSuccess) {
+ LOG.debug("SCM container info sync is already running.");
+ }
+ } catch (Throwable t) {
+ LOG.error("Unexpected exception while syncing data from SCM.", t);
+ } finally {
+ isSyncDataFromSCMRunning.compareAndSet(true, false);
+ }
+ },
+ initialDelay,
+ interval,
+ TimeUnit.MILLISECONDS);
getDatanodeProtocolServer().start();
this.reconScmTasks.forEach(ReconScmTask::start);
}
@@ -416,22 +465,102 @@ public class ReconStorageContainerManagerFacade
}
} catch (IOException e) {
LOG.error("Exception encountered while getting SCM DB.");
+ } finally {
+ isSyncDataFromSCMRunning.compareAndSet(true, false);
}
}
public void updateReconSCMDBWithNewSnapshot() throws IOException {
- DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
- if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
- LOG.info("Got new checkpoint from SCM : " +
- dbSnapshot.getCheckpointLocation());
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
+ DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+ if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+ LOG.info("Got new checkpoint from SCM : " +
+ dbSnapshot.getCheckpointLocation());
+ try {
+ initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ } catch (IOException e) {
+ LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ }
+ } else {
+ LOG.error("Null snapshot location got from SCM.");
+ }
+ } else {
+ LOG.warn("SCM DB sync is already running.");
+ }
+ }
+
+ public boolean syncWithSCMContainerInfo()
+ throws IOException {
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
try {
- initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ List<ContainerInfo> containers = containerManager.getContainers();
+
+ long totalContainerCount = scmServiceProvider.getContainerCount(
+ HddsProtos.LifeCycleState.CLOSED);
+ long containerCountPerCall =
+ getContainerCountPerCall(totalContainerCount);
+ long startContainerId = 1;
+ long retrievedContainerCount = 0;
+ if (totalContainerCount > 0) {
+ while (retrievedContainerCount < totalContainerCount) {
+ List<ContainerInfo> listOfContainers = scmServiceProvider.
+ getListOfContainers(startContainerId,
+ Long.valueOf(containerCountPerCall).intValue(),
+ HddsProtos.LifeCycleState.CLOSED);
+ if (null != listOfContainers && listOfContainers.size() > 0) {
+ LOG.info("Got list of containers from SCM : " +
+ listOfContainers.size());
+ listOfContainers.forEach(containerInfo -> {
+ long containerID = containerInfo.getContainerID();
+ boolean isContainerPresentAtRecon =
+ containers.contains(containerInfo);
+ if (!isContainerPresentAtRecon) {
+ try {
+ ContainerWithPipeline containerWithPipeline =
+ scmServiceProvider.getContainerWithPipeline(
+ containerID);
+ containerManager.addNewContainer(containerWithPipeline);
+ } catch (IOException e) {
+ LOG.error("Could not get container with pipeline " +
+ "for container : {}", containerID);
+ } catch (TimeoutException e) {
+ LOG.error("Could not add new container {} in Recon " +
+ "container manager cache.", containerID);
+ }
+ }
+ });
+ startContainerId = listOfContainers.get(
+ listOfContainers.size() - 1).getContainerID() + 1;
+ } else {
+ LOG.info("No containers found at SCM in CLOSED state");
+ return false;
+ }
+ retrievedContainerCount += containerCountPerCall;
+ }
+ }
} catch (IOException e) {
LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ return false;
}
} else {
- LOG.error("Null snapshot location got from SCM.");
+ LOG.debug("SCM DB sync is already running.");
+ return false;
}
+ return true;
+ }
+
+ private long getContainerCountPerCall(long totalContainerCount) {
+ // Assumption of size of 1 container info object here is 1 MB
+ long containersMetaDataTotalRpcRespSizeMB =
+ CONTAINER_METADATA_SIZE * totalContainerCount;
+ long hadoopRPCSize = ozoneConfiguration.getInt(
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
+ CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
+ long containerCountPerCall = containersMetaDataTotalRpcRespSizeMB <=
+ hadoopRPCSize ? totalContainerCount :
+ Math.round(Math.floor(
+ hadoopRPCSize / (double) CONTAINER_METADATA_SIZE));
+ return containerCountPerCall;
}
private void deleteOldSCMDB() throws IOException {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
index cf57937a15..2ee3f6bbd7 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/StorageContainerServiceProvider.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
@@ -78,4 +79,24 @@ public interface StorageContainerServiceProvider {
* @return DBCheckpoint from SCM.
*/
DBCheckpoint getSCMDBSnapshot();
+
+ /**
+ * Get the list of containers from SCM. This is a RPC call.
+ *
+ * @param startContainerID the start container id
+ * @param count the number of containers to return
+ * @param state the containers in given state to be returned
+ * @return the list of containers from SCM in a given state
+ * @throws IOException
+ */
+ List<ContainerInfo> getListOfContainers(long startContainerID,
+ int count,
+ HddsProtos.LifeCycleState state)
+ throws IOException;
+
+ /**
+ * Requests SCM for container count for a given state.
+ * @return Total number of containers in SCM.
+ */
+ long getContainerCount(HddsProtos.LifeCycleState state) throws IOException;
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
index 1550a89cd4..d4ceaec89f 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/StorageContainerServiceProviderImpl.java
@@ -41,6 +41,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.ha.InterSCMGrpcClient;
import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
@@ -155,6 +156,12 @@ public class StorageContainerServiceProviderImpl
return scmClient.getContainerCount();
}
+ @Override
+ public long getContainerCount(HddsProtos.LifeCycleState state)
+ throws IOException {
+ return scmClient.getContainerCount(state);
+ }
+
public String getScmDBSnapshotUrl() {
return scmDBSnapshotUrl;
}
@@ -215,4 +222,12 @@ public class StorageContainerServiceProviderImpl
}
return null;
}
+
+ @Override
+ public List<ContainerInfo> getListOfContainers(
+ long startContainerID, int count, HddsProtos.LifeCycleState state)
+ throws IOException {
+ return scmClient.getListOfContainers(startContainerID, count, state);
+ }
+
}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
index c88c2440f9..4e86ca9056 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTaskRecordGenerator.java
@@ -76,20 +76,25 @@ public class TestContainerHealthTaskRecordGenerator {
new ContainerHealthStatus(container, replicas, placementPolicy);
// Missing record should be retained
assertTrue(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, missingRecord()));
+ .retainOrUpdateRecord(status, missingRecord()
+ ));
// Under / Over / Mis replicated should not be retained as if a container
is
// missing then it is not in any other category.
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, underReplicatedRecord()));
+ .retainOrUpdateRecord(status, underReplicatedRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, overReplicatedRecord()));
+ .retainOrUpdateRecord(status, overReplicatedRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, misReplicatedRecord()));
+ .retainOrUpdateRecord(status, misReplicatedRecord()
+ ));
replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
status = new ContainerHealthStatus(container, replicas, placementPolicy);
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, missingRecord()));
+ .retainOrUpdateRecord(status, missingRecord()
+ ));
}
@Test
@@ -109,11 +114,14 @@ public class TestContainerHealthTaskRecordGenerator {
// Missing / Over / Mis replicated should not be retained
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, missingRecord()));
+ .retainOrUpdateRecord(status, missingRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, overReplicatedRecord()));
+ .retainOrUpdateRecord(status, overReplicatedRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, misReplicatedRecord()));
+ .retainOrUpdateRecord(status, misReplicatedRecord()
+ ));
// Container is now replicated OK - should be removed.
replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
@@ -139,11 +147,14 @@ public class TestContainerHealthTaskRecordGenerator {
// Missing / Over / Mis replicated should not be retained
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, missingRecord()));
+ .retainOrUpdateRecord(status, missingRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, underReplicatedRecord()));
+ .retainOrUpdateRecord(status, underReplicatedRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, misReplicatedRecord()));
+ .retainOrUpdateRecord(status, misReplicatedRecord()
+ ));
// Container is now replicated OK - should be removed.
replicas = generateReplicas(container, CLOSED, CLOSED, CLOSED);
@@ -173,11 +184,14 @@ public class TestContainerHealthTaskRecordGenerator {
// Missing / Over / Mis replicated should not be retained
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, missingRecord()));
+ .retainOrUpdateRecord(status, missingRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, underReplicatedRecord()));
+ .retainOrUpdateRecord(status, underReplicatedRecord()
+ ));
assertFalse(ContainerHealthTask.ContainerHealthRecords
- .retainOrUpdateRecord(status, overReplicatedRecord()));
+ .retainOrUpdateRecord(status, overReplicatedRecord()
+ ));
// Container is now placed OK - should be removed.
when(placementPolicy.validateContainerPlacement(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]