[
https://issues.apache.org/jira/browse/HDFS-11182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15717913#comment-15717913
]
Manjunath commented on HDFS-11182:
----------------------------------
Arpit Agarwal, sorry for the late code review as this comment holds good more
for the JIRA HDFS-11149. Thanks for the effort for this change. In
DatasetVolumeChecker the checkAllVolumes and checkAllVolumesAsync are supposed
to do same tasks except for few additional things in checkAllVolumes. The code
can be modified such that checkAllVolumes inturn calls the checkAllVolumesAsync
and save lines of code and also make sure that both does the same intended work
without code duplication in future as well.
public ResultHandler checkAllVolumesAsync(
final FsDatasetSpi<? extends FsVolumeSpi> dataset,
Callback callback) {
if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet();
return null;
}
lastAllVolumesCheck = timer.monotonicNow();
final Set<StorageLocation> healthyVolumes = new HashSet<>();
final Set<StorageLocation> failedVolumes = new HashSet<>();
final Set<StorageLocation> allVolumes = new HashSet<>();
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
final CountDownLatch latch = new CountDownLatch(references.size());
LOG.info("Checking {} volumes", references.size());
for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i);
allVolumes.add(reference.getVolume().getStorageLocation());
// The context parameter is currently ignored.
ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(),
IGNORED_CONTEXT);
LOG.info("Scheduled health check for volume {}",
reference.getVolume());
Futures.addCallback(future, new ResultHandler(
reference, healthyVolumes, failedVolumes, latch, callback));
}
numAsyncDatasetChecks.incrementAndGet();
return new ResultHandler(
null, healthyVolumes, failedVolumes, latch, callback);
}
public Set<StorageLocation> checkAllVolumes(
final FsDatasetSpi<? extends FsVolumeSpi> dataset)
throws InterruptedException {
ResultHandler resultHandler = checkAllVolumesAsync(dataset, null);
if (resultHandler == null) {
return Collections.emptySet();
}
// Wait until our timeout elapses, after which we give up on
// the remaining volumes.
if (!resultHandler.getResultsLatch().await(maxAllowedTimeForCheckMs,
TimeUnit.MILLISECONDS)) {
LOG.warn("checkAllVolumes timed out after {} ms" +
maxAllowedTimeForCheckMs);
}
numSyncDatasetChecks.incrementAndGet();
synchronized (this) {
// All volumes that have not been detected as healthy should be
// considered failed. This is a superset of 'failedVolumes'.
//
// Make a copy under the mutex as Sets.difference() returns a view
// of a potentially changing set.
return new HashSet<>(Sets.difference(resultHandler.getAllVolumes(),
resultHandler.getHealthyVolumes()));
}
}
Although this is not a great thing but it will save duplicate code and
differences of core behaviour between the 2 methods which is currently present
as well in terms of some loggers eg:- LOG.info("Scheduled health check for
volume {}", reference.getVolume()); this is there only in checkAllVolumes where
as LOG.info("Checking {} volumes", references.size()); is present only in
checkAllVolumesAsync. Please let me know your thoughts on this.
> Update DataNode to use DatasetVolumeChecker
> -------------------------------------------
>
> Key: HDFS-11182
> URL: https://issues.apache.org/jira/browse/HDFS-11182
> Project: Hadoop HDFS
> Issue Type: Sub-task
> Components: datanode
> Reporter: Arpit Agarwal
> Assignee: Arpit Agarwal
>
> Update DataNode to use the DatasetVolumeChecker class introduced in
> HDFS-11149 to parallelize disk checks.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]