bogthe commented on a change in pull request #3101:
URL: https://github.com/apache/hadoop/pull/3101#discussion_r661542408
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -0,0 +1,241 @@
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * TODO list:
+ * - Improve implementation to use Completable Futures
+ * - Better error handling
+ * - Add abstract class + tests for LocalFS
+ * - Add tests for this class
+ * - Add documentation
+ * - This class
+ * - `filesystem.md`
+ * - Clean old `innerCopyFromLocalFile` code up
+ */
+public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CopyFromLocalOperation.class);
+
+ private final CopyFromLocalOperationCallbacks callbacks;
+ private final boolean deleteSource;
+ private final boolean overwrite;
+ private final Path source;
+ private final Path destination;
+
+ private FileStatus dstStatus;
+
+ public CopyFromLocalOperation(
+ final StoreContext storeContext,
+ Path source,
+ Path destination,
+ boolean deleteSource,
+ boolean overwrite,
+ CopyFromLocalOperationCallbacks callbacks) {
+ super(storeContext);
+ this.callbacks = callbacks;
+ this.deleteSource = deleteSource;
+ this.overwrite = overwrite;
+ this.source = source;
+ this.destination = destination;
+ }
+
+ @Override
+ @Retries.RetryTranslated
+ public Void execute()
+ throws IOException, PathExistsException {
+ LOG.debug("Copying local file from {} to {}", source, destination);
+ File sourceFile = callbacks.pathToFile(source);
+ try {
+ dstStatus = callbacks.getFileStatus(destination);
+ } catch (FileNotFoundException e) {
+ dstStatus = null;
+ }
+
+ checkSource(sourceFile);
+ prepareDestination(destination, sourceFile, overwrite);
+ uploadSourceFromFS();
+
+ if (deleteSource) {
+ callbacks.delete(source, true);
+ }
+
+ return null;
+ }
+
+ private void uploadSourceFromFS()
+ throws IOException, PathExistsException {
+ RemoteIterator<LocatedFileStatus> localFiles = callbacks
+ .listStatusIterator(source, true);
+
+ // After all files are traversed, this set will contain only emptyDirs
+ Set<Path> emptyDirs = new HashSet<>();
+ List<UploadEntry> entries = new ArrayList<>();
+ while (localFiles.hasNext()) {
+ LocatedFileStatus sourceFile = localFiles.next();
+ Path sourceFilePath = sourceFile.getPath();
+
+ // Directory containing this file / directory isn't empty
+ emptyDirs.remove(sourceFilePath.getParent());
+
+ if (sourceFile.isDirectory()) {
+ emptyDirs.add(sourceFilePath);
+ continue;
+ }
+
+ Path destPath = getFinalPath(sourceFilePath);
+ // UploadEntries: have a destination path, a file size
+ entries.add(new UploadEntry(
+ sourceFilePath,
+ destPath,
+ sourceFile.getLen()));
+ }
+
+ if (localFiles instanceof Closeable) {
Review comment:
Added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]