[
https://issues.apache.org/jira/browse/HADOOP-17139?focusedWorklogId=617072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-617072
]
ASF GitHub Bot logged work on HADOOP-17139:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jun/21 14:42
Start Date: 30/Jun/21 14:42
Worklog Time Spent: 10m
Work Description: bogthe commented on a change in pull request #3101:
URL: https://github.com/apache/hadoop/pull/3101#discussion_r661542408
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -0,0 +1,241 @@
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * TODO list:
+ * - Improve implementation to use Completable Futures
+ * - Better error handling
+ * - Add abstract class + tests for LocalFS
+ * - Add tests for this class
+ * - Add documentation
+ * - This class
+ * - `filesystem.md`
+ * - Clean old `innerCopyFromLocalFile` code up
+ */
+public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CopyFromLocalOperation.class);
+
+ private final CopyFromLocalOperationCallbacks callbacks;
+ private final boolean deleteSource;
+ private final boolean overwrite;
+ private final Path source;
+ private final Path destination;
+
+ private FileStatus dstStatus;
+
+ public CopyFromLocalOperation(
+ final StoreContext storeContext,
+ Path source,
+ Path destination,
+ boolean deleteSource,
+ boolean overwrite,
+ CopyFromLocalOperationCallbacks callbacks) {
+ super(storeContext);
+ this.callbacks = callbacks;
+ this.deleteSource = deleteSource;
+ this.overwrite = overwrite;
+ this.source = source;
+ this.destination = destination;
+ }
+
+ @Override
+ @Retries.RetryTranslated
+ public Void execute()
+ throws IOException, PathExistsException {
+ LOG.debug("Copying local file from {} to {}", source, destination);
+ File sourceFile = callbacks.pathToFile(source);
+ try {
+ dstStatus = callbacks.getFileStatus(destination);
+ } catch (FileNotFoundException e) {
+ dstStatus = null;
+ }
+
+ checkSource(sourceFile);
+ prepareDestination(destination, sourceFile, overwrite);
+ uploadSourceFromFS();
+
+ if (deleteSource) {
+ callbacks.delete(source, true);
+ }
+
+ return null;
+ }
+
+ private void uploadSourceFromFS()
+ throws IOException, PathExistsException {
+ RemoteIterator<LocatedFileStatus> localFiles = callbacks
+ .listStatusIterator(source, true);
+
+ // After all files are traversed, this set will contain only emptyDirs
+ Set<Path> emptyDirs = new HashSet<>();
+ List<UploadEntry> entries = new ArrayList<>();
+ while (localFiles.hasNext()) {
+ LocatedFileStatus sourceFile = localFiles.next();
+ Path sourceFilePath = sourceFile.getPath();
+
+ // Directory containing this file / directory isn't empty
+ emptyDirs.remove(sourceFilePath.getParent());
+
+ if (sourceFile.isDirectory()) {
+ emptyDirs.add(sourceFilePath);
+ continue;
+ }
+
+ Path destPath = getFinalPath(sourceFilePath);
+ // UploadEntries: have a destination path, a file size
+ entries.add(new UploadEntry(
+ sourceFilePath,
+ destPath,
+ sourceFile.getLen()));
+ }
+
+ if (localFiles instanceof Closeable) {
Review comment:
Added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 617072)
Time Spent: 1h 50m (was: 1h 40m)
> Re-enable optimized copyFromLocal implementation in S3AFileSystem
> -----------------------------------------------------------------
>
> Key: HADOOP-17139
> URL: https://issues.apache.org/jira/browse/HADOOP-17139
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.3.0, 3.2.1
> Reporter: Sahil Takiar
> Assignee: Bogdan Stolojan
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> It looks like HADOOP-15932 disabled the optimized copyFromLocal
> implementation in S3A for correctness reasons. innerCopyFromLocalFile should
> be fixed and re-enabled. The current implementation uses
> FileSystem.copyFromLocal which will open an input stream from the local fs
> and an output stream to the destination fs, and then call IOUtils.copyBytes.
> With default configs, this will cause S3A to read the file into memory, write
> it back to a file on the local fs, and then when the file is closed, upload
> it to S3.
> The optimized version of copyFromLocal in innerCopyFromLocalFile, directly
> creates a PutObjectRequest request with the local file as the input.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]