This is an automated email from the ASF dual-hosted git repository.
jwills pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git
The following commit(s) were added to refs/heads/master by this push:
new 07458f7 CRUNCH-660, CRUNCH-675: Use DistCp instead of FileUtils.copy
when source and destination paths are in different filesystems
07458f7 is described below
commit 07458f78282e1b55aee90960818f5fcb35dae5c0
Author: Andrew Olson <[email protected]>
AuthorDate: Wed Jan 23 11:23:57 2019 -0600
CRUNCH-660, CRUNCH-675: Use DistCp instead of FileUtils.copy when source
and destination paths are in different filesystems
Signed-off-by: Josh Wills <[email protected]>
---
crunch-core/pom.xml | 6 ++
.../crunch/impl/mr/run/RuntimeParameters.java | 4 ++
.../org/apache/crunch/io/impl/FileTargetImpl.java | 81 ++++++++++++++++++++--
crunch-hbase/pom.xml | 6 ++
.../org/apache/crunch/io/hbase/HFileTarget.java | 71 +++++++++++++++++++
pom.xml | 6 ++
6 files changed, 167 insertions(+), 7 deletions(-)
diff --git a/crunch-core/pom.xml b/crunch-core/pom.xml
index 4b41203..cd77373 100644
--- a/crunch-core/pom.xml
+++ b/crunch-core/pom.xml
@@ -67,6 +67,12 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Override the slf4j dependency from Avro, which is incompatible with
Hadoop's. -->
<dependency>
diff --git
a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index a36b910..f8b1e76 100644
---
a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -47,6 +47,10 @@ public final class RuntimeParameters {
public static final String MAX_POLL_INTERVAL = "crunch.max.poll.interval";
+ public static final String FILE_TARGET_USE_DISTCP =
"crunch.file.target.use.distcp";
+
+ public static final String FILE_TARGET_MAX_DISTCP_TASKS =
"crunch.file.target.max.distcp.tasks";
+
// Not instantiated
private RuntimeParameters() {
}
diff --git
a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 5f4cfbb..e8b1dfe 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,6 +18,7 @@
package org.apache.crunch.io.impl;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -29,7 +30,6 @@ import java.util.regex.Pattern;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -54,6 +54,8 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -162,26 +164,51 @@ public class FileTargetImpl implements PathTarget {
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index)
throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
- Path src = getSourcePattern(workingPath, index);
- Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
FileSystem dstFs = path.getFileSystem(conf);
if (!dstFs.exists(path)) {
dstFs.mkdirs(path);
}
+ Path srcPattern = getSourcePattern(workingPath, index);
boolean sameFs = isCompatible(srcFs, path);
+ boolean useDistributedCopy =
conf.getBoolean(RuntimeParameters.FILE_TARGET_USE_DISTCP, true);
+ int maxDistributedCopyTasks =
conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 1000);
+ int maxThreads = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1);
+
+ if (!sameFs) {
+ if (useDistributedCopy) {
+ LOG.info("Source and destination are in different file systems,
performing distributed copy from {} to {}", srcPattern,
+ path);
+ handeOutputsDistributedCopy(conf, srcPattern, srcFs, dstFs,
maxDistributedCopyTasks);
+ } else {
+ LOG.info("Source and destination are in different file systems,
performing asynch copies from {} to {}", srcPattern, path);
+ handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs,
maxThreads);
+ }
+ } else {
+ LOG.info("Source and destination are in the same file system, performing
asynch renames from {} to {}", srcPattern, path);
+ handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs,
maxThreads);
+ }
+
+ }
+
+ private void handleOutputsAsynchronously(Configuration conf, Path
srcPattern, FileSystem srcFs, FileSystem dstFs,
+ boolean sameFs, int maxThreads) throws IOException {
+ Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern),
srcPattern);
List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
- conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1)));
+ maxThreads));
for (Path s : srcs) {
Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
renameFutures.add(
executorService.submit(
new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs)));
}
- LOG.debug("Renaming " + renameFutures.size() + " files.");
-
+ if (sameFs) {
+ LOG.info("Renaming {} files using at most {} threads.",
renameFutures.size(), maxThreads);
+ } else {
+ LOG.info("Copying {} files using at most {} threads.",
renameFutures.size(), maxThreads);
+ }
ListenableFuture<List<Boolean>> future =
Futures.successfulAsList(renameFutures);
List<Boolean> renameResults = null;
@@ -193,9 +220,49 @@ public class FileTargetImpl implements PathTarget {
executorService.shutdownNow();
}
if (renameResults != null && !renameResults.contains(false)) {
+ if (sameFs) {
+ LOG.info("Renamed {} files.", renameFutures.size());
+ } else {
+ LOG.info("Copied {} files.", renameFutures.size());
+ }
dstFs.create(getSuccessIndicator(), true).close();
- LOG.debug("Renamed " + renameFutures.size() + " files.");
+ LOG.info("Created success indicator file");
+ }
+ }
+
+ private void handeOutputsDistributedCopy(Configuration conf, Path
srcPattern, FileSystem srcFs, FileSystem dstFs,
+ int maxDistributedCopyTasks) throws IOException {
+ Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern),
srcPattern);
+ if (srcs.length > 0) {
+ LOG.info("Distributed copying {} files using at most {} tasks",
srcs.length, maxDistributedCopyTasks);
+ // Once https://issues.apache.org/jira/browse/HADOOP-15281 is available,
we can use the direct write
+ // distcp optimization if the target path is in S3
+ DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
+ options.setMaxMaps(maxDistributedCopyTasks);
+ options.setOverwrite(true);
+ options.setBlocking(true);
+
+ Configuration distCpConf = new Configuration(conf);
+ // Remove unnecessary and problematic properties from the DistCp
configuration. This is necessary since
+ // files referenced by these properties may have already been deleted
when the DistCp is being started.
+ distCpConf.unset("mapreduce.job.cache.files");
+ distCpConf.unset("mapreduce.job.classpath.files");
+ distCpConf.unset("tmpjars");
+
+ try {
+ DistCp distCp = new DistCp(distCpConf, options);
+ if (!distCp.execute().isSuccessful()) {
+ throw new CrunchRuntimeException("Distributed copy failed from " +
srcPattern + " to " + path);
+ }
+ LOG.info("Distributed copy completed for {} files", srcs.length);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Distributed copy failed from " +
srcPattern + " to " + path, e);
+ }
+ } else {
+ LOG.info("No files found to distributed copy at {}", srcPattern);
}
+ dstFs.create(getSuccessIndicator(), true).close();
+ LOG.info("Created success indicator file");
}
protected Path getSuccessIndicator() {
diff --git a/crunch-hbase/pom.xml b/crunch-hbase/pom.xml
index 075b197..cc0ec7d 100644
--- a/crunch-hbase/pom.xml
+++ b/crunch-hbase/pom.xml
@@ -56,6 +56,12 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<type>jar</type>
diff --git
a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
index b1ce5ba..420c9dd 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java
@@ -17,12 +17,16 @@
*/
package org.apache.crunch.io.hbase;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.SequentialFileNamingScheme;
import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -30,9 +34,18 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
public class HFileTarget extends FileTargetImpl {
+ private static final Logger LOG = LoggerFactory.getLogger(HFileTarget.class);
+
public HFileTarget(String path) {
this(new Path(path));
}
@@ -78,6 +91,64 @@ public class HFileTarget extends FileTargetImpl {
}
@Override
+ public void handleOutputs(Configuration conf, Path workingPath, int index)
throws IOException {
+ FileSystem srcFs = workingPath.getFileSystem(conf);
+ Path src = getSourcePattern(workingPath, index);
+ Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
+ FileSystem dstFs = path.getFileSystem(conf);
+ if (!dstFs.exists(path)) {
+ dstFs.mkdirs(path);
+ }
+ boolean sameFs = isCompatible(srcFs, path);
+
+ if (!sameFs) {
+ if (srcs.length > 0) {
+ int maxDistributedCopyTasks =
conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 1000);
+ LOG.info(
+ "Source and destination are in different file systems,
performing distcp of {} files from [{}] to [{}] "
+ + "using at most {} tasks",
+ new Object[] { srcs.length, src, path, maxDistributedCopyTasks
});
+ // Once https://issues.apache.org/jira/browse/HADOOP-15281 is
available, we can use the direct write
+ // distcp optimization if the target path is in S3
+ DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
+ options.setMaxMaps(maxDistributedCopyTasks);
+ options.setOverwrite(true);
+ options.setBlocking(true);
+
+ Configuration distCpConf = new Configuration(conf);
+ // Remove unnecessary and problematic properties from the DistCp
configuration. This is necessary since
+ // files referenced by these properties may have already been deleted
when the DistCp is being started.
+ distCpConf.unset("mapreduce.job.cache.files");
+ distCpConf.unset("mapreduce.job.classpath.files");
+ distCpConf.unset("tmpjars");
+
+ try {
+ DistCp distCp = new DistCp(distCpConf, options);
+ if (!distCp.execute().isSuccessful()) {
+ throw new CrunchRuntimeException("Unable to move files through
distcp from " + src + " to " + path);
+ }
+ LOG.info("Distributed copy completed for {} files", srcs.length);
+ } catch (Exception e) {
+ throw new CrunchRuntimeException("Unable to move files through
distcp from " + src + " to " + path, e);
+ }
+ } else {
+ LOG.info("No files found at [{}], not attempting to copy HFiles", src);
+ }
+ } else {
+ LOG.info(
+ "Source and destination are in the same file system, performing
rename of {} files from [{}] to [{}]",
+ new Object[] { srcs.length, src, path });
+
+ for (Path s : srcs) {
+ Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
+ srcFs.rename(s, d);
+ }
+ }
+ dstFs.create(getSuccessIndicator(), true).close();
+ LOG.info("Created success indicator file");
+ }
+
+ @Override
public String toString() {
return "HFile(" + path + ")";
}
diff --git a/pom.xml b/pom.xml
index 11b87fd..ca689ed 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,6 +201,12 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive.version}</version>