steveloughran commented on a change in pull request #2971: URL: https://github.com/apache/hadoop/pull/2971#discussion_r827956606
########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/LoadManifestsStage.java ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.fs.statistics.IOStatisticsSource; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.util.functional.TaskPool; + +import static org.apache.commons.io.FileUtils.byteCountToDisplaySize; +import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_MANIFEST_FILE_SIZE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_ALL_MANIFESTS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics; + +/** + * Stage to load all the task manifests in the job attempt directory. + * Invoked in Job Commit. + * Manifests are loaded in parallel. + * The IOStatistics snapshot passed in is built up with the statistics + * and the statistics stripped from the manifest if prune == true. + * This keeps the memory footprint of each manifest down. + */ +public class LoadManifestsStage extends + AbstractJobCommitStage<Boolean, LoadManifestsStage.Result> { + + private static final Logger LOG = LoggerFactory.getLogger( + LoadManifestsStage.class); + + /** + * Summary of manifest loading. + */ + private final SummaryInfo summaryInfo = new SummaryInfo(); + + /** + * Should manifests be pruned of IOStatistics? + */ + private boolean pruneManifests; + + /** + * List of loaded manifests. + */ + private final List<TaskManifest> manifests = new ArrayList<>(); + + public LoadManifestsStage(final StageConfig stageConfig) { + super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true); + } + + /** + * Load the manifests. + * @param prune should manifests be pruned of IOStatistics? + * @return the summary and a list of manifests. + * @throws IOException IO failure. + */ + @Override + protected LoadManifestsStage.Result executeStage( + final Boolean prune) throws IOException { + + final Path manifestDir = getTaskManifestDir(); + LOG.info("{}: Executing Manifest Job Commit with manifests in {}", + getName(), + manifestDir); + pruneManifests = prune; + // build a list of all task manifests successfully committed + // + msync(manifestDir); + final RemoteIterator<FileStatus> manifestFiles = listManifests(); + + final List<TaskManifest> manifestList = loadAllManifests(manifestFiles); + LOG.info("{}: Summary of {} manifests loaded in {}: {}", + getName(), + manifestList.size(), + manifestDir, + summaryInfo); + + // collect any stats + maybeAddIOStatistics(getIOStatistics(), manifestFiles); + return new LoadManifestsStage.Result(summaryInfo, manifestList); + } + + /** + * Load all the manifests. + * @param manifestFiles list of manifest files. + * @return the loaded manifests. + * @throws IOException IO Failure. + */ + private List<TaskManifest> loadAllManifests( + final RemoteIterator<FileStatus> manifestFiles) throws IOException { + + trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () -> + TaskPool.foreach(manifestFiles) + .executeWith(getIOProcessors()) + .stopOnFailure() + .run(this::processOneManifest)); + return manifests; + } + + /** + * Method invoked to process one manifest. + * @param status file to process. + * @throws IOException failure to load/parse + */ + private void processOneManifest(FileStatus status) + throws IOException { + updateAuditContext(OP_LOAD_ALL_MANIFESTS); + + TaskManifest m = fetchTaskManifest(status); + progress(); + + // update the manifest list in a synchronized block. + + synchronized (manifests) { + manifests.add(m); Review comment: surfaced on s3a committer only on multi TB terasorts...but there each file included a list of etags, so was much bigger merging all manifest dir lists lets us optimise dir creation; to do that and stil do independent manifest committing of work would require double loading of manifests, one for dir setup and another for renaming. I did start off with some parallel work here but it is both complex (need two thread pools to avoid deadlocks) and didn't seem tangibly more efficient if it surfaces in real world jobs then i can worry about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
