umamaheswararao commented on code in PR #3545:
URL: https://github.com/apache/ozone/pull/3545#discussion_r906293237
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -233,24 +229,77 @@ public synchronized void processAll() {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
+ new ArrayList<>();
+ List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
+ new ArrayList<>();
+
for (ContainerInfo c : containers) {
if (!shouldRun()) {
break;
}
- switch (c.getReplicationType()) {
- case EC:
- break;
- default:
+ report.increment(c.getState());
+ if (c.getReplicationType() != EC) {
legacyReplicationManager.processContainer(c, report);
+ continue;
+ }
+ try {
+ processContainer(c, underReplicated, overReplicated, report);
Review Comment:
What do you mean by below TODO? I thought we will just populate the under
and over replicated lists above and later we process them with queues at the
line 254: TODO.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -233,24 +229,77 @@ public synchronized void processAll() {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
+ new ArrayList<>();
+ List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
+ new ArrayList<>();
+
for (ContainerInfo c : containers) {
if (!shouldRun()) {
break;
}
- switch (c.getReplicationType()) {
- case EC:
- break;
- default:
+ report.increment(c.getState());
+ if (c.getReplicationType() != EC) {
legacyReplicationManager.processContainer(c, report);
+ continue;
+ }
+ try {
+ processContainer(c, underReplicated, overReplicated, report);
+ // TODO - send any commands contained in the health result
+ } catch (ContainerNotFoundException e) {
+ LOG.error("Container {} not found", c.getContainerID(), e);
}
}
report.setComplete();
+ // TODO - Sort the pending lists by priority and assign to the main queue,
+ // which is yet to be defined.
this.containerReport = report;
LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", clock.millis() - start,
containers.size());
}
+ protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
+ List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
+ ReplicationManagerReport report) throws ContainerNotFoundException {
Review Comment:
variable and assignment can be in same statement.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -233,24 +229,77 @@ public synchronized void processAll() {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
+ new ArrayList<>();
+ List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
+ new ArrayList<>();
+
for (ContainerInfo c : containers) {
if (!shouldRun()) {
break;
}
- switch (c.getReplicationType()) {
- case EC:
- break;
- default:
+ report.increment(c.getState());
+ if (c.getReplicationType() != EC) {
legacyReplicationManager.processContainer(c, report);
+ continue;
+ }
+ try {
+ processContainer(c, underReplicated, overReplicated, report);
Review Comment:
What do you mean by below TODO? I thought we will just populate the under
and over replicated lists above and later we process them with queues at the
line 254: TODO.
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java:
##########
@@ -233,24 +229,77 @@ public synchronized void processAll() {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
+ new ArrayList<>();
+ List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
+ new ArrayList<>();
+
for (ContainerInfo c : containers) {
if (!shouldRun()) {
break;
}
- switch (c.getReplicationType()) {
- case EC:
- break;
- default:
+ report.increment(c.getState());
+ if (c.getReplicationType() != EC) {
legacyReplicationManager.processContainer(c, report);
+ continue;
+ }
+ try {
+ processContainer(c, underReplicated, overReplicated, report);
+ // TODO - send any commands contained in the health result
+ } catch (ContainerNotFoundException e) {
+ LOG.error("Container {} not found", c.getContainerID(), e);
}
}
report.setComplete();
+ // TODO - Sort the pending lists by priority and assign to the main queue,
+ // which is yet to be defined.
this.containerReport = report;
LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", clock.millis() - start,
containers.size());
}
+ protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
+ List<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
+ List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
+ ReplicationManagerReport report) throws ContainerNotFoundException {
+ Set<ContainerReplica> replicas;
+ replicas = containerManager.getContainerReplicas(
+ containerInfo.containerID());
+ List<ContainerReplicaOp> pendingOps =
+ containerReplicaPendingOps.getPendingOps(containerInfo.containerID());
+ ContainerHealthResult health = ecContainerHealthCheck
+ .checkHealth(containerInfo, replicas, pendingOps, 0);
+ // TODO - should the report have a HEALTHY state, rather than just bad
+ // states? It would need to be added to legacy RM too.
+ if (health.getHealthState()
+ == ContainerHealthResult.HealthState.UNDER_REPLICATED) {
+ report.incrementAndSample(
+ HealthState.UNDER_REPLICATED, containerInfo.containerID());
+ ContainerHealthResult.UnderReplicatedHealthResult underHealth
+ = ((ContainerHealthResult.UnderReplicatedHealthResult) health);
+ if (underHealth.isUnrecoverable()) {
+ // TODO - do we need a new health state for unrecoverable EC?
+ report.incrementAndSample(
+ HealthState.MISSING, containerInfo.containerID());
+ }
+ if (!underHealth.isSufficientlyReplicatedAfterPending() &&
+ !underHealth.isUnrecoverable()) {
+ underRep.add(underHealth);
+ }
+ } else if (health.getHealthState()
Review Comment:
I am wondering in EC case, a container can have both situations. Some
indexes over-replicated while other replicas missing. In that situation what is
the HealthCheck result? We may give preference to missing? meaning
Underreplication.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]