sumitagrawl commented on code in PR #3947:
URL: https://github.com/apache/ozone/pull/3947#discussion_r1020015205
##########
hadoop-ozone/dev-support/intellij/ozone-site.xml:
##########
@@ -71,4 +71,24 @@
<name>datanode.replication.port</name>
<value>0</value>
</property>
+ <property>
+ <name>ozone.scm.ratis.enable</name>
+ <value>false</value>
+ </property>
Review Comment:
Seems local changes is pushed, plz revert
##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3397,4 +3397,21 @@
optimum performance.
</description>
</property>
+
+ <property>
+ <name>ozone.recon.scm.snapshot.task.initial.delay</name>
+ <value>1m</value>
+ <tag>OZONE, RECON, OM</tag>
+ <description>
+ Initial delay in MINUTES by Recon to request SCM DB Snapshot.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.scm.snapshot.task.interval.delay</name>
+ <value>10m</value>
Review Comment:
scm query for snapshot can be reduced to once a day after first sync, as
Recon can construct same view from DN. This is just a precaution additionally
to avoid loa on SCM.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java:
##########
@@ -143,6 +143,19 @@ public final class ReconServerConfigKeys {
"ozone.recon.scm.connection.request.timeout";
public static final String
OZONE_RECON_SCM_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "5s";
+
+ public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY =
+ "ozone.recon.scm.snapshot.task.interval.delay";
+
+ public static final String OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT
+ = "10m";
Review Comment:
same as ozone xml configuration, this periodic sync can be with lesser
frequency as once a day
##########
hadoop-hdds/common/src/main/resources/ozone-default.xml:
##########
@@ -3397,4 +3397,21 @@
optimum performance.
</description>
</property>
+
+ <property>
+ <name>ozone.recon.scm.snapshot.task.initial.delay</name>
+ <value>1m</value>
+ <tag>OZONE, RECON, OM</tag>
Review Comment:
Tag should be only RECON, right? OM is not applicable
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -315,6 +339,33 @@ public void start() {
} else {
initializePipelinesFromScm();
}
+ LOG.debug("Started the SCM Container Info sync scheduler.");
+ long interval = ozoneConfiguration.getTimeDuration(
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DELAY,
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS);
+ long initialDelay = ozoneConfiguration.getTimeDuration(
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY,
+ OZONE_RECON_SCM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ // This periodic sync with SCM container cache is needed because during
+ // the window when recon will be down and any container being added
+ // newly and went missing, that container will not be reported as missing
by
+ // recon till there is a difference of container count equivalent to
+ // threshold value defined in "ozone.recon.scm.container.threshold"
+ // between SCM container cache and recon container cache.
+ scheduler.scheduleWithFixedDelay(() -> {
Review Comment:
Can we refactor this to background service to perform this as separate class
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -400,18 +451,200 @@ private void initializeSCMDB() {
}
public void updateReconSCMDBWithNewSnapshot() throws IOException {
- DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
- if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
- LOG.info("Got new checkpoint from SCM : " +
- dbSnapshot.getCheckpointLocation());
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
+ DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+ if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+ LOG.info("Got new checkpoint from SCM : " +
+ dbSnapshot.getCheckpointLocation());
+ try {
+ initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ } catch (IOException e) {
+ LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ }
+ } else {
+ LOG.error("Null snapshot location got from SCM.");
+ }
+ } else {
+ LOG.warn("SCM DB sync is already running.");
+ }
+ }
+
+ public boolean syncSCMContainerInfoWithReconContainerInfo()
+ throws IOException {
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
Review Comment:
isSyncDataFromSCMRunning will never be executed as its already set in
updateReconSCMDBWithNewSnapshot
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -400,18 +451,200 @@ private void initializeSCMDB() {
}
public void updateReconSCMDBWithNewSnapshot() throws IOException {
- DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
- if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
- LOG.info("Got new checkpoint from SCM : " +
- dbSnapshot.getCheckpointLocation());
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
+ DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+ if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+ LOG.info("Got new checkpoint from SCM : " +
+ dbSnapshot.getCheckpointLocation());
+ try {
+ initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ } catch (IOException e) {
+ LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ }
+ } else {
+ LOG.error("Null snapshot location got from SCM.");
+ }
+ } else {
+ LOG.warn("SCM DB sync is already running.");
+ }
+ }
+
+ public boolean syncSCMContainerInfoWithReconContainerInfo()
+ throws IOException {
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
try {
- initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ List<ContainerInfo> containers = containerManager.getContainers();
+ List<ContainerInfo> listOfContainers = scmServiceProvider.
+ getListOfContainers();
+ if (null != listOfContainers && listOfContainers.size() > 0) {
+ LOG.info("Got list of containers frm SCM : " +
+ listOfContainers.size());
+ listOfContainers.forEach(containerInfo -> {
+ try {
+ long containerID = containerInfo.getContainerID();
+ List<HddsProtos.SCMContainerReplicaProto> containerReplicas
+ = scmServiceProvider.getContainerReplicas(containerID);
+ boolean isContainerPresentAtRecon =
+ containers.remove(containerID);
+ if (!isContainerPresentAtRecon) {
+ try {
+ ContainerWithPipeline containerWithPipeline =
+ scmServiceProvider.getContainerWithPipeline(containerID);
+ containerManager.addNewContainer(containerWithPipeline);
+ } catch (IOException e) {
+ LOG.error("Could not get container with pipeline " +
+ "for container : {}", containerID);
+ } catch (TimeoutException e) {
+ LOG.error("Could not add new container {} in Recon " +
+ "container manager cache.", containerID);
+ }
+ }
+ synchronized (containerInfo) {
Review Comment:
This lock is of no meaning as its on temporary object from data received
from SCM
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -400,18 +451,200 @@ private void initializeSCMDB() {
}
public void updateReconSCMDBWithNewSnapshot() throws IOException {
- DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
- if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
- LOG.info("Got new checkpoint from SCM : " +
- dbSnapshot.getCheckpointLocation());
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
+ DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+ if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+ LOG.info("Got new checkpoint from SCM : " +
+ dbSnapshot.getCheckpointLocation());
+ try {
+ initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ } catch (IOException e) {
+ LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ }
+ } else {
+ LOG.error("Null snapshot location got from SCM.");
+ }
+ } else {
+ LOG.warn("SCM DB sync is already running.");
+ }
+ }
+
+ public boolean syncSCMContainerInfoWithReconContainerInfo()
+ throws IOException {
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
try {
- initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ List<ContainerInfo> containers = containerManager.getContainers();
+ List<ContainerInfo> listOfContainers = scmServiceProvider.
+ getListOfContainers();
+ if (null != listOfContainers && listOfContainers.size() > 0) {
+ LOG.info("Got list of containers frm SCM : " +
+ listOfContainers.size());
+ listOfContainers.forEach(containerInfo -> {
+ try {
+ long containerID = containerInfo.getContainerID();
+ List<HddsProtos.SCMContainerReplicaProto> containerReplicas
+ = scmServiceProvider.getContainerReplicas(containerID);
Review Comment:
This will make communication stuck for long time, if there are 10,000 of
containers or much higher, including closed containers.
We need a kind of FCR report from SCM in a iterative manner to support this
with container information.
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java:
##########
@@ -400,18 +451,200 @@ private void initializeSCMDB() {
}
public void updateReconSCMDBWithNewSnapshot() throws IOException {
- DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
- if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
- LOG.info("Got new checkpoint from SCM : " +
- dbSnapshot.getCheckpointLocation());
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
+ DBCheckpoint dbSnapshot = scmServiceProvider.getSCMDBSnapshot();
+ if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
+ LOG.info("Got new checkpoint from SCM : " +
+ dbSnapshot.getCheckpointLocation());
+ try {
+ initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ } catch (IOException e) {
+ LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ }
+ } else {
+ LOG.error("Null snapshot location got from SCM.");
+ }
+ } else {
+ LOG.warn("SCM DB sync is already running.");
+ }
+ }
+
+ public boolean syncSCMContainerInfoWithReconContainerInfo()
+ throws IOException {
+ if (isSyncDataFromSCMRunning.compareAndSet(false, true)) {
try {
- initializeNewRdbStore(dbSnapshot.getCheckpointLocation().toFile());
+ List<ContainerInfo> containers = containerManager.getContainers();
+ List<ContainerInfo> listOfContainers = scmServiceProvider.
+ getListOfContainers();
+ if (null != listOfContainers && listOfContainers.size() > 0) {
+ LOG.info("Got list of containers frm SCM : " +
+ listOfContainers.size());
+ listOfContainers.forEach(containerInfo -> {
+ try {
+ long containerID = containerInfo.getContainerID();
+ List<HddsProtos.SCMContainerReplicaProto> containerReplicas
+ = scmServiceProvider.getContainerReplicas(containerID);
+ boolean isContainerPresentAtRecon =
+ containers.remove(containerID);
+ if (!isContainerPresentAtRecon) {
+ try {
+ ContainerWithPipeline containerWithPipeline =
+ scmServiceProvider.getContainerWithPipeline(containerID);
+ containerManager.addNewContainer(containerWithPipeline);
+ } catch (IOException e) {
+ LOG.error("Could not get container with pipeline " +
+ "for container : {}", containerID);
+ } catch (TimeoutException e) {
+ LOG.error("Could not add new container {} in Recon " +
+ "container manager cache.", containerID);
+ }
+ }
+ synchronized (containerInfo) {
+ containerReplicas.forEach(containerReplicaProto -> {
+ final ContainerReplica replica = buildContainerReplica(
+ containerInfo, containerReplicaProto);
+ try {
+ updateContainerState(containerInfo, replica);
+ containerManager.updateContainerReplica(
+ containerInfo.containerID(), replica);
+ } catch (ContainerNotFoundException e) {
+ LOG.error("Could not update container replica as " +
+ "container {} not found.", containerID);
+ } catch (InvalidStateTransitionException e) {
+ LOG.error("Invalid state transition for container {}",
+ containerID);
+ } catch (IOException e) {
+ LOG.error("Could not update container {} state.",
+ containerID);
+ } catch (TimeoutException e) {
+ LOG.error("Timeout while updating container {} state.",
+ containerID);
+ }
+ });
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to get container replicas for container : {}",
+ containerInfo.getContainerID());
+ }
+ });
+ } else {
+ LOG.debug("SCM DB sync is already running.");
+ return false;
+ }
} catch (IOException e) {
LOG.error("Unable to refresh Recon SCM DB Snapshot. ", e);
+ return false;
}
- } else {
- LOG.error("Null snapshot location got from SCM.");
}
+ return true;
+ }
+
+ private boolean updateContainerState(ContainerInfo container,
+ ContainerReplica replica)
+ throws InvalidStateTransitionException, IOException, TimeoutException {
+ final ContainerID containerId = container.containerID();
Review Comment:
I think we need not update container state based on replica, this will be
done automatically by FCR report from DN, task here is to get missing container
information which is present at SCM, but not present at DN.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]