Repository: crunch Updated Branches: refs/heads/master 86ecd82d9 -> 46b33437a
CRUNCH-580: Use thread pools in org.apache.crunch.io.impl.FileTargetImpl#handleOutputs for file renaming. Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/46b33437 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/46b33437 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/46b33437 Branch: refs/heads/master Commit: 46b33437ad19fea88446ceb18ed34e4f3f88a7eb Parents: 86ecd82 Author: Jeff Quinn <[email protected]> Authored: Wed Nov 25 10:11:04 2015 -0800 Committer: Josh Wills <[email protected]> Committed: Wed Dec 9 20:45:36 2015 -0800 ---------------------------------------------------------------------- .../crunch/impl/mr/run/RuntimeParameters.java | 2 + .../apache/crunch/io/impl/FileTargetImpl.java | 71 ++++++++++++++++++-- .../crunch/io/impl/FileTargetImplTest.java | 62 +++++++++++++++++ 3 files changed, 129 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 07abf11..fe6f7ee 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -43,6 +43,8 @@ public final class RuntimeParameters { public static final String MAX_RUNNING_JOBS = "crunch.max.running.jobs"; + public static final String FILE_TARGET_MAX_THREADS = "crunch.file.target.max.threads"; + // Not instantiated private RuntimeParameters() { } http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index e87485d..5f4cfbb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -18,16 +18,28 @@ package org.apache.crunch.io.impl; import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.impl.mr.plan.PlanningParameters; +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.FileNamingScheme; import org.apache.crunch.io.FormatBundle; @@ -118,6 +130,35 @@ public class FileTargetImpl implements PathTarget { return ptype.getConverter(); } + private class WorkingPathFileMover implements Callable<Boolean> { + private Configuration conf; + private Path src; + private Path dst; + private FileSystem srcFs; + private FileSystem dstFs; + private boolean sameFs; + + + public WorkingPathFileMover(Configuration conf, Path src, Path dst, + FileSystem srcFs, FileSystem dstFs, boolean sameFs) { + this.conf = conf; + this.src = src; + this.dst = dst; + this.srcFs = srcFs; + this.dstFs = dstFs; + this.sameFs = sameFs; + } + + @Override + public Boolean call() throws IOException { + if (sameFs) { + return srcFs.rename(src, dst); + } else { + return FileUtil.copy(srcFs, src, dstFs, dst, true, true, conf); + } + } + } + @Override public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException { FileSystem srcFs = workingPath.getFileSystem(conf); @@ -128,15 +169,33 @@ public class FileTargetImpl implements PathTarget { dstFs.mkdirs(path); } boolean sameFs = isCompatible(srcFs, path); + List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList(); + ListeningExecutorService executorService = + MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool( + conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1))); for (Path s : srcs) { Path d = getDestFile(conf, s, path, s.getName().contains("-m-")); - if (sameFs) { - srcFs.rename(s, d); - } else { - FileUtil.copy(srcFs, s, dstFs, d, true, true, conf); - } + renameFutures.add( + executorService.submit( + new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs))); + } + LOG.debug("Renaming " + renameFutures.size() + " files."); + + ListenableFuture<List<Boolean>> future = + Futures.successfulAsList(renameFutures); + List<Boolean> renameResults = null; + try { + renameResults = future.get(); + } catch (InterruptedException | ExecutionException e) { + Throwables.propagate(e); + } finally { + executorService.shutdownNow(); + } + if (renameResults != null && !renameResults.contains(false)) { + dstFs.create(getSuccessIndicator(), true).close(); + LOG.debug("Renamed " + renameFutures.size() + " files."); } - dstFs.create(getSuccessIndicator(), true).close(); } protected Path getSuccessIndicator() { http://git-wip-us.apache.org/repos/asf/crunch/blob/46b33437/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java b/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java new file mode 100644 index 0000000..6bc13d2 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/io/impl/FileTargetImplTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.impl; + +import org.apache.commons.io.FileUtils; +import org.apache.crunch.io.SequentialFileNamingScheme; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +import static org.junit.Assert.assertEquals; + +public class FileTargetImplTest { + + @Rule + public TemporaryFolder TMP = new TemporaryFolder(); + + @Test + public void testHandleOutputsMovesFilesToDestination() throws Exception { + java.nio.file.Path testWorkingPath = TMP.newFolder().toPath(); + java.nio.file.Path testDestinationPath = TMP.newFolder().toPath(); + FileTargetImpl fileTarget = new FileTargetImpl( + new Path(testDestinationPath.toAbsolutePath().toString()), + SequenceFileOutputFormat.class, + SequentialFileNamingScheme.getInstance()); + + File testPart1 = new File(testWorkingPath.toAbsolutePath().toString(), "part-m-00000"); + File testPart2 = new File(testWorkingPath.toAbsolutePath().toString(), "part-m-00001"); + FileUtils.writeStringToFile(testPart1, "test1"); + FileUtils.writeStringToFile(testPart2, "test2"); + fileTarget.handleOutputs(new Configuration(), + new Path(testWorkingPath.toAbsolutePath().toString()), + -1); + + assertEquals(FileUtils.readFileToString( + new File(testDestinationPath.toAbsolutePath().toString(), "part-m-00000")), + "test1"); + assertEquals(FileUtils.readFileToString( + new File(testDestinationPath.toAbsolutePath().toString(), "part-m-00001")), + "test2"); + } +} \ No newline at end of file
