steveloughran commented on a change in pull request #3101:
URL: https://github.com/apache/hadoop/pull/3101#discussion_r672361799
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -0,0 +1,241 @@
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * TODO list:
+ * - Improve implementation to use Completable Futures
+ * - Better error handling
+ * - Add abstract class + tests for LocalFS
+ * - Add tests for this class
+ * - Add documentation
+ * - This class
+ * - `filesystem.md`
+ * - Clean old `innerCopyFromLocalFile` code up
+ */
+public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CopyFromLocalOperation.class);
+
+ private final CopyFromLocalOperationCallbacks callbacks;
+ private final boolean deleteSource;
+ private final boolean overwrite;
+ private final Path source;
+ private final Path destination;
+
+ private FileStatus dstStatus;
+
+ public CopyFromLocalOperation(
+ final StoreContext storeContext,
+ Path source,
+ Path destination,
+ boolean deleteSource,
+ boolean overwrite,
+ CopyFromLocalOperationCallbacks callbacks) {
+ super(storeContext);
+ this.callbacks = callbacks;
+ this.deleteSource = deleteSource;
+ this.overwrite = overwrite;
+ this.source = source;
+ this.destination = destination;
+ }
+
+ @Override
+ @Retries.RetryTranslated
+ public Void execute()
+ throws IOException, PathExistsException {
+ LOG.debug("Copying local file from {} to {}", source, destination);
+ File sourceFile = callbacks.pathToFile(source);
+ try {
+ dstStatus = callbacks.getFileStatus(destination);
+ } catch (FileNotFoundException e) {
+ dstStatus = null;
+ }
+
+ checkSource(sourceFile);
+ prepareDestination(destination, sourceFile, overwrite);
+ uploadSourceFromFS();
+
+ if (deleteSource) {
+ callbacks.delete(source, true);
+ }
+
+ return null;
+ }
+
+ private void uploadSourceFromFS()
+ throws IOException, PathExistsException {
+ RemoteIterator<LocatedFileStatus> localFiles = callbacks
+ .listStatusIterator(source, true);
+
+ // After all files are traversed, this set will contain only emptyDirs
+ Set<Path> emptyDirs = new HashSet<>();
+ List<UploadEntry> entries = new ArrayList<>();
+ while (localFiles.hasNext()) {
+ LocatedFileStatus sourceFile = localFiles.next();
+ Path sourceFilePath = sourceFile.getPath();
+
+ // Directory containing this file / directory isn't empty
+ emptyDirs.remove(sourceFilePath.getParent());
+
+ if (sourceFile.isDirectory()) {
+ emptyDirs.add(sourceFilePath);
+ continue;
+ }
+
+ Path destPath = getFinalPath(sourceFilePath);
+ // UploadEntries: have a destination path, a file size
+ entries.add(new UploadEntry(
+ sourceFilePath,
+ destPath,
+ sourceFile.getLen()));
+ }
+
+ if (localFiles instanceof Closeable) {
+ ((Closeable) localFiles).close();
+ }
+
+ // Sort all upload entries based on size
+ entries.sort(new ReverseComparator(new UploadEntry.SizeComparator()));
+
+ int LARGEST_N_FILES = 5;
+ final int sortedUploadsCount = Math.min(LARGEST_N_FILES,
entries.size());
+ List<UploadEntry> uploaded = new ArrayList<>();
+
+ // Take only top most X entries and upload
+ for (int uploadNo = 0; uploadNo < sortedUploadsCount; uploadNo++) {
Review comment:
If we use the xfer manager -which is the simplest- it gets one of the thread
pools we create. It has limitations, but we haven't sat down to replace it
(yet) as it mostly works OK.
For other uses, the StoreContext executors should be the executor pool. And
it's possible to create a restricted subpool of that, which is what
S3ABlockOutputStream does.
FWIW, I am not sure that (a) the default size of that pool is right and (b)
whether we should just go for for an unlimited pool. Big changes, but as most
of the threads are blocked on remote IO, pool size is probably a needless
limitation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]