steveloughran commented on a change in pull request #3101:
URL: https://github.com/apache/hadoop/pull/3101#discussion_r672423179
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -98,19 +98,19 @@
private final Path source;
/**
- * Destination path, expected to be
+ * Async operations executor
Review comment:
nit: always add a "." at the end of the first javadoc sentence -some
java versions require it.
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -3780,73 +3781,76 @@ private boolean s3Exists(final Path path, final
Set<StatusProbeEnum> probes)
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
- Path dst) throws IOException {
+ Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
- trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
- // innerCopyFromLocalFile(delSrc, overwrite, src, dst);
- super.copyFromLocalFile(delSrc, overwrite, src, dst);
- return null;
- });
+ trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
+ () -> new CopyFromLocalOperation(
+ createStoreContext(),
+ src,
+ dst,
+ delSrc,
+ overwrite,
+ createCopyFromLocalCallbacks()).execute());
}
- /**
- * The src file is on the local disk. Add it to FS at
- * the given dst name.
- *
- * This version doesn't need to create a temporary file to calculate the md5.
- * Sadly this doesn't seem to be used by the shell cp :(
- *
- * <i>HADOOP-15932:</i> this method has been unwired from
- * {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until
- * it is extended to list and copy whole directories.
- * delSrc indicates if the source should be removed
- * @param delSrc whether to delete the src
- * @param overwrite whether to overwrite an existing file
- * @param src Source path: must be on local filesystem
- * @param dst path
- * @throws IOException IO problem
- * @throws FileAlreadyExistsException the destination file exists and
- * overwrite==false, or if the destination is a directory.
- * @throws FileNotFoundException if the source file does not exit
- * @throws AmazonClientException failure in the AWS SDK
- * @throws IllegalArgumentException if the source path is not on the local FS
- */
- @Retries.RetryTranslated
- private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
- Path src, Path dst)
- throws IOException, FileAlreadyExistsException, AmazonClientException {
- LOG.debug("Copying local file from {} to {}", src, dst);
-
- // Since we have a local file, we don't need to stream into a temporary
file
+ protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
+ createCopyFromLocalCallbacks() throws IOException {
LocalFileSystem local = getLocal(getConf());
- File srcfile = local.pathToFile(src);
- if (!srcfile.exists()) {
- throw new FileNotFoundException("No file: " + src);
+ return new CopyFromLocalCallbacksImpl(local);
+ }
+
+ protected class CopyFromLocalCallbacksImpl implements
+ CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
+ private final LocalFileSystem local;
+
+ private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
+ this.local = local;
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listStatusIterator(
+ final Path path) throws IOException {
+ return local.listLocatedStatus(path);
}
- if (!srcfile.isFile()) {
- throw new FileNotFoundException("Not a file: " + src);
+
+ @Override
+ public File pathToFile(Path path) {
+ return local.pathToFile(path);
}
- try {
- FileStatus status = innerGetFileStatus(dst, false, StatusProbeEnum.ALL);
- if (!status.isFile()) {
- throw new FileAlreadyExistsException(dst + " exists and is not a
file");
- }
- if (!overwrite) {
- throw new FileAlreadyExistsException(dst + " already exists");
- }
- } catch (FileNotFoundException e) {
- // no destination, all is well
- }
- final String key = pathToKey(dst);
- final ObjectMetadata om = newObjectMetadata(srcfile.length());
- Progressable progress = null;
- PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
- invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true,
- () -> executePut(putObjectRequest, progress));
- if (delSrc) {
- local.delete(src, false);
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
Review comment:
prefer `deleteLocal()`
##########
File path:
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+ AbstractFSContractTestBase {
+
+ private static final Charset ASCII = StandardCharsets.US_ASCII;
+ private File file;
+
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ if (file != null) {
+ file.delete();
+ }
+ }
+
+ @Test
+ public void testCopyEmptyFile() throws Throwable {
+ file = File.createTempFile("test", ".txt");
+ Path dest = copyFromLocal(file, true);
+ assertPathExists("uploaded file not found", dest);
+ }
+
+ @Test
+ public void testCopyFile() throws Throwable {
+ String message = "hello";
+ file = createTempFile(message);
+ Path dest = copyFromLocal(file, true);
+
+ assertPathExists("uploaded file not found", dest);
+ assertTrue("source file deleted", Files.exists(file.toPath()));
+
+ FileSystem fs = getFileSystem();
+ FileStatus status = fs.getFileStatus(dest);
+ assertEquals("File length not equal " + status,
+ message.getBytes(ASCII).length, status.getLen());
+ assertFileTextEquals(dest, message);
+ }
+
+ @Test
+ public void testCopyFileNoOverwrite() throws Throwable {
+ file = createTempFile("hello");
+ copyFromLocal(file, true);
+ intercept(PathExistsException.class,
+ () -> copyFromLocal(file, false));
+ }
+
+ @Test
+ public void testCopyFileOverwrite() throws Throwable {
+ file = createTempFile("hello");
+ Path dest = copyFromLocal(file, true);
+ String updated = "updated";
+ FileUtils.write(file, updated, ASCII);
+ copyFromLocal(file, true);
+ assertFileTextEquals(dest, updated);
+ }
+
+ @Test
+ public void testCopyMissingFile() throws Throwable {
+ describe("Copying a file that's not there must fail.");
+ file = createTempFile("test");
+ file.delete();
+ // first upload to create
+ intercept(FileNotFoundException.class, "",
+ () -> copyFromLocal(file, true));
+ }
+
+ @Test
+ public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+ describe("Source is a file delSrc flag is set to true");
+
+ file = createTempFile("test");
+ copyFromLocal(file, false, true);
+
+ assertFalse("Source file not deleted", Files.exists(file.toPath()));
+ }
+
+ @Test
+ public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+ describe("Source is a file and destination is a directory. File" +
+ "must be copied inside the directory.");
+
+ file = createTempFile("test");
+ Path source = new Path(file.toURI());
+ FileSystem fs = getFileSystem();
+ File dir = createTempDirectory("test");
+ Path destination = fileToPath(dir);
+
+ // Make sure there's nothing already existing at destination
+ fs.delete(destination, false);
+ mkdirs(destination);
+ fs.copyFromLocalFile(source, destination);
+
+ Path expectedFile = path(dir.getName() + "/" + source.getName());
+ assertPathExists("File not copied into directory", expectedFile);
+ }
+
+ @Test
+ public void testSourceIsFileAndDestinationIsNonExistentDirectory()
+ throws Throwable {
+ describe("Source is a file and destination directory does not exist. " +
+ "Copy operation must still work.");
+
+ file = createTempFile("test");
+ Path source = new Path(file.toURI());
+ FileSystem fs = getFileSystem();
+
+ File dir = createTempDirectory("test");
+ Path destination = fileToPath(dir);
+ fs.delete(destination, false);
+ assertPathDoesNotExist("Destination not deleted", destination);
+
+ fs.copyFromLocalFile(source, destination);
+ assertPathExists("Destination doesn't exist.", destination);
+ }
+
+ @Test
+ public void testSrcIsDirWithFilesAndCopySuccessful() throws Throwable {
+ describe("Source is a directory with files, copy must copy all" +
+ " dir contents to destination");
+ String firstChild = "childOne";
+ String secondChild = "childTwo";
+ File parent = createTempDirectory("parent");
+ File root = parent.getParentFile();
+ File childFile = createTempFile(parent, firstChild, firstChild);
+ File secondChildFile = createTempFile(parent, secondChild, secondChild);
+
+ copyFromLocal(parent, false);
+
+ assertPathExists("Parent directory not copied", fileToPath(parent));
+ assertFileTextEquals(fileToPath(childFile, root), firstChild);
+ assertFileTextEquals(fileToPath(secondChildFile, root), secondChild);
+ }
+
+ @Test
+ public void testSrcIsEmptyDirWithCopySuccessful() throws Throwable {
+ describe("Source is an empty directory, copy must succeed");
+ File source = createTempDirectory("source");
+ Path dest = copyFromLocal(source, false);
+
+ assertPathExists("Empty directory not copied", dest);
+ }
+
+ @Test
+ public void testSrcIsDirWithOverwriteOptions() throws Throwable {
+ describe("Source is a directory, destination exists and" +
+ "must be overwritten.");
Review comment:
add a space. Ideally: set your IDE to warn on these.
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -161,6 +164,21 @@ public Void execute()
return null;
}
+ /**
+ * Does a {@link CopyFromLocalOperationCallbacks#getFileStatus(Path)}
+ * operation on the provided destination and updates the internal status of
+ * destPath property
+ *
+ * @throws IOException if getFileStatus fails
+ */
+ private void updateDestStatus(Path dest) throws IOException {
+ try {
+ destStatus = callbacks.getFileStatus(dest);
+ } catch (FileNotFoundException e) {
+ destStatus = null;
Review comment:
if you are working with optional, shouldn't that be Optional.none? Or
just just use null/FileStatus
##########
File path:
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -159,19 +158,30 @@ public void testSrcIsEmptyDirWithCopySuccessful() throws
Throwable {
public void testSrcIsDirWithOverwriteOptions() throws Throwable {
describe("Source is a directory, destination exists and" +
"should be overwritten.");
+ // Disabling checksum because overwriting directories does not
+ // overwrite checksums
+ FileSystem fs = getFileSystem();
+ fs.setVerifyChecksum(false);
+
File source = createTempDirectory("source");
- String contents = "child file";
+ Path sourcePath = new Path(source.toURI());
+ String contents = "test file";
File child = createTempFile(source, "child", contents);
- copyFromLocal(source, false);
- // TODO: Fix local FS
+ Path dest = path(source.getName()).getParent();
+ fs.copyFromLocalFile(sourcePath, dest);
intercept(PathExistsException.class,
- () -> copyFromLocal(source, false));
+ () -> fs.copyFromLocalFile(false, false,
+ sourcePath, dest));
+
+ String updated = "updated contents";
+ FileUtils.write(child, updated, ASCII);
+ fs.copyFromLocalFile(false, true, sourcePath, dest);
- copyFromLocal(source, true);
assertPathExists("Parent directory not copied", fileToPath(source));
assertFileTextEquals(fileToPath(child, source.getParentFile()),
- contents);
+ updated);
+ getFileSystem().setVerifyChecksum(true);
Review comment:
should really be in a finally clause
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -3780,73 +3781,76 @@ private boolean s3Exists(final Path path, final
Set<StatusProbeEnum> probes)
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
- Path dst) throws IOException {
+ Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
- trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
- // innerCopyFromLocalFile(delSrc, overwrite, src, dst);
- super.copyFromLocalFile(delSrc, overwrite, src, dst);
- return null;
- });
+ trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
+ () -> new CopyFromLocalOperation(
+ createStoreContext(),
+ src,
+ dst,
+ delSrc,
+ overwrite,
+ createCopyFromLocalCallbacks()).execute());
}
- /**
- * The src file is on the local disk. Add it to FS at
- * the given dst name.
- *
- * This version doesn't need to create a temporary file to calculate the md5.
- * Sadly this doesn't seem to be used by the shell cp :(
- *
- * <i>HADOOP-15932:</i> this method has been unwired from
- * {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until
- * it is extended to list and copy whole directories.
- * delSrc indicates if the source should be removed
- * @param delSrc whether to delete the src
- * @param overwrite whether to overwrite an existing file
- * @param src Source path: must be on local filesystem
- * @param dst path
- * @throws IOException IO problem
- * @throws FileAlreadyExistsException the destination file exists and
- * overwrite==false, or if the destination is a directory.
- * @throws FileNotFoundException if the source file does not exit
- * @throws AmazonClientException failure in the AWS SDK
- * @throws IllegalArgumentException if the source path is not on the local FS
- */
- @Retries.RetryTranslated
- private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
- Path src, Path dst)
- throws IOException, FileAlreadyExistsException, AmazonClientException {
- LOG.debug("Copying local file from {} to {}", src, dst);
-
- // Since we have a local file, we don't need to stream into a temporary
file
+ protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
+ createCopyFromLocalCallbacks() throws IOException {
LocalFileSystem local = getLocal(getConf());
- File srcfile = local.pathToFile(src);
- if (!srcfile.exists()) {
- throw new FileNotFoundException("No file: " + src);
+ return new CopyFromLocalCallbacksImpl(local);
+ }
+
+ protected class CopyFromLocalCallbacksImpl implements
+ CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
+ private final LocalFileSystem local;
+
+ private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
+ this.local = local;
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listStatusIterator(
+ final Path path) throws IOException {
+ return local.listLocatedStatus(path);
}
- if (!srcfile.isFile()) {
- throw new FileNotFoundException("Not a file: " + src);
+
+ @Override
+ public File pathToFile(Path path) {
+ return local.pathToFile(path);
}
- try {
- FileStatus status = innerGetFileStatus(dst, false, StatusProbeEnum.ALL);
- if (!status.isFile()) {
- throw new FileAlreadyExistsException(dst + " exists and is not a
file");
- }
- if (!overwrite) {
- throw new FileAlreadyExistsException(dst + " already exists");
- }
- } catch (FileNotFoundException e) {
- // no destination, all is well
- }
- final String key = pathToKey(dst);
- final ObjectMetadata om = newObjectMetadata(srcfile.length());
- Progressable progress = null;
- PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
- invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true,
- () -> executePut(putObjectRequest, progress));
- if (delSrc) {
- local.delete(src, false);
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ return local.delete(path, recursive);
+ }
+
+ @Override
+ public void copyLocalFileFromTo(File file, Path from, Path to) throws
IOException {
+ trackDurationAndSpan(
+ OBJECT_PUT_REQUESTS,
+ to,
+ () -> {
+ final String key = pathToKey(to);
+ final ObjectMetadata om = newObjectMetadata(file.length());
+ Progressable progress = null;
+ PutObjectRequest putObjectRequest = newPutObjectRequest(key, om,
file);
+ S3AFileSystem.this.invoker.retry(
+ "putObject(" + "" + ")", to.toString(),
+ true,
+ () -> executePut(putObjectRequest, progress));
+
+ return null;
+ });
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return S3AFileSystem.this.getFileStatus(f);
+ }
+
+ @Override
+ public boolean createEmptyDir(Path path) throws IOException {
+ return S3AFileSystem.this.mkdirs(path);
Review comment:
Now we are doing audit span tracking we have to make sure we don't call
an `@AuditEntryPoint` from internal code, as that will lose the current audit.
Best to invoke that operation directory, passing in whatever storecontext
the copy operation has
```java
return trackDuration(getDurationTrackerFactory(),
INVOCATION_MKDIRS.getSymbol()
new MkdirOperation(
storeContext,
path,
createMkdirOperationCallbacks()));
````
##########
File path:
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFSCopyFromLocal.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractCopyFromLocalTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
+import org.junit.Test;
+
+import java.io.File;
Review comment:
nit: check import ordering here
##########
File path:
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import org.junit.Test;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+ AbstractFSContractTestBase {
+
+ private static final Charset ASCII = StandardCharsets.US_ASCII;
+ private File file;
+
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ if (file != null) {
+ file.delete();
+ }
+ }
+
+ @Test
+ public void testCopyEmptyFile() throws Throwable {
+ file = File.createTempFile("test", ".txt");
+ Path dest = copyFromLocal(file, true);
+ assertPathExists("uploaded file not found", dest);
+ }
+
+ @Test
+ public void testCopyFile() throws Throwable {
+ String message = "hello";
+ file = createTempFile(message);
+ Path dest = copyFromLocal(file, true);
+
+ assertPathExists("uploaded file not found", dest);
+ assertTrue("source file deleted", Files.exists(file.toPath()));
+
+ FileSystem fs = getFileSystem();
+ FileStatus status = fs.getFileStatus(dest);
+ assertEquals("File length not equal " + status,
+ message.getBytes(ASCII).length, status.getLen());
+ assertFileTextEquals(dest, message);
+ }
+
+ @Test
+ public void testCopyFileNoOverwrite() throws Throwable {
+ file = createTempFile("hello");
+ copyFromLocal(file, true);
+ intercept(PathExistsException.class,
+ () -> copyFromLocal(file, false));
+ }
+
+ @Test
+ public void testCopyFileOverwrite() throws Throwable {
+ file = createTempFile("hello");
+ Path dest = copyFromLocal(file, true);
+ String updated = "updated";
+ FileUtils.write(file, updated, ASCII);
+ copyFromLocal(file, true);
+ assertFileTextEquals(dest, updated);
+ }
+
+ @Test
+ public void testCopyMissingFile() throws Throwable {
+ describe("Copying a file that's not there must fail.");
+ file = createTempFile("test");
+ file.delete();
+ // first upload to create
+ intercept(FileNotFoundException.class, "",
+ () -> copyFromLocal(file, true));
+ }
+
+ @Test
+ public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+ describe("Source is a file delSrc flag is set to true");
+
+ file = createTempFile("test");
+ copyFromLocal(file, false, true);
+
+ assertFalse("Source file not deleted", Files.exists(file.toPath()));
+ }
+
+ @Test
+ public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+ describe("Source is a file and destination is a directory. File" +
+ "must be copied inside the directory.");
Review comment:
nit: add space. Ideally, cut line down at "File" and leave the space
after `directory.`
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -144,10 +144,13 @@ public Void execute()
throws IOException, PathExistsException {
LOG.debug("Copying local file from {} to {}", source, destination);
File sourceFile = callbacks.pathToFile(source);
- try {
- dstStatus = callbacks.getFileStatus(destination);
- } catch (FileNotFoundException e) {
- dstStatus = null;
+ updateDestStatus(destination);
+
+ // Handles bar/ -> foo/ => foo/bar and bar/ -> foo/bar/ => foo/bar/bar
+ if (getDestStatus().isPresent() && getDestStatus().get().isDirectory()
Review comment:
may trigger an NPE
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -3780,73 +3781,76 @@ private boolean s3Exists(final Path path, final
Set<StatusProbeEnum> probes)
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
- Path dst) throws IOException {
+ Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
- trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
- // innerCopyFromLocalFile(delSrc, overwrite, src, dst);
- super.copyFromLocalFile(delSrc, overwrite, src, dst);
- return null;
- });
+ trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
+ () -> new CopyFromLocalOperation(
+ createStoreContext(),
+ src,
+ dst,
+ delSrc,
+ overwrite,
+ createCopyFromLocalCallbacks()).execute());
}
- /**
- * The src file is on the local disk. Add it to FS at
- * the given dst name.
- *
- * This version doesn't need to create a temporary file to calculate the md5.
- * Sadly this doesn't seem to be used by the shell cp :(
- *
- * <i>HADOOP-15932:</i> this method has been unwired from
- * {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until
- * it is extended to list and copy whole directories.
- * delSrc indicates if the source should be removed
- * @param delSrc whether to delete the src
- * @param overwrite whether to overwrite an existing file
- * @param src Source path: must be on local filesystem
- * @param dst path
- * @throws IOException IO problem
- * @throws FileAlreadyExistsException the destination file exists and
- * overwrite==false, or if the destination is a directory.
- * @throws FileNotFoundException if the source file does not exit
- * @throws AmazonClientException failure in the AWS SDK
- * @throws IllegalArgumentException if the source path is not on the local FS
- */
- @Retries.RetryTranslated
- private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
- Path src, Path dst)
- throws IOException, FileAlreadyExistsException, AmazonClientException {
- LOG.debug("Copying local file from {} to {}", src, dst);
-
- // Since we have a local file, we don't need to stream into a temporary
file
+ protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
+ createCopyFromLocalCallbacks() throws IOException {
LocalFileSystem local = getLocal(getConf());
- File srcfile = local.pathToFile(src);
- if (!srcfile.exists()) {
- throw new FileNotFoundException("No file: " + src);
+ return new CopyFromLocalCallbacksImpl(local);
+ }
+
+ protected class CopyFromLocalCallbacksImpl implements
+ CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
+ private final LocalFileSystem local;
+
+ private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
+ this.local = local;
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listStatusIterator(
+ final Path path) throws IOException {
+ return local.listLocatedStatus(path);
}
- if (!srcfile.isFile()) {
- throw new FileNotFoundException("Not a file: " + src);
+
+ @Override
+ public File pathToFile(Path path) {
Review comment:
prefer `pathToLocalFile()`
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.io.IOUtils;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
+import static
org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
+
+/**
+ * Implementation of CopyFromLocalOperation.
+ * <p>
+ * This operation copies a file or directory (recursively) from a local
+ * FS to an object store. Initially, this operation has been developed for
+ * S3 (s3a) interaction, however, there's minimal work needed for it to
+ * work with other stores.
+ * </p>
+ * <p>How the uploading of files works:</p>
+ * <ul>
+ * <li> all source files and directories are scanned through;</li>
+ * <li> the LARGEST_N_FILES start uploading; </li>
+ * <li> the remaining files are shuffled and uploaded; </li>
+ * <li>
+ * any remaining empty directory is uploaded too to preserve local
+ * tree structure.
+ * </li>
+ * </ul>
+ */
+public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
+
+ /**
+ * Largest N files to be uploaded first.
+ */
+ private static final int LARGEST_N_FILES = 5;
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CopyFromLocalOperation.class);
+
+ /**
+ * Callbacks to be used by this operation for external / IO actions
+ */
+ private final CopyFromLocalOperationCallbacks callbacks;
+
+ /**
+ * Delete source after operation finishes
+ */
+ private final boolean deleteSource;
+
+ /**
+ * Overwrite destination files / folders
+ */
+ private final boolean overwrite;
+
+ /**
+ * Source path to file / directory
+ */
+ private final Path source;
+
+ /**
+ * Async operations executor
+ */
+ private final ListeningExecutorService executor;
+
+ /**
+ * Destination path
+ */
+ private Path destination;
+
+ /**
+ * Destination file status
+ */
+ private FileStatus destStatus;
+
+ public CopyFromLocalOperation(
+ final StoreContext storeContext,
+ Path source,
+ Path destination,
+ boolean deleteSource,
+ boolean overwrite,
+ CopyFromLocalOperationCallbacks callbacks) {
+ super(storeContext);
+ this.callbacks = callbacks;
+ this.deleteSource = deleteSource;
+ this.overwrite = overwrite;
+ this.source = source;
+ this.destination = destination;
+
+ // Capacity of 1 is a safe default for now since transfer manager can also
+ // spawn threads when uploading bigger files.
+ this.executor = MoreExecutors.listeningDecorator(
+ storeContext.createThrottledExecutor(1)
+ );
+ }
+
+ /**
+ * Executes the {@link CopyFromLocalOperation}.
+ *
+ * @throws IOException - if there are any failures with upload or
deletion
+ * of files. Check {@link
CopyFromLocalOperationCallbacks} for specifics.
+ * @throws PathExistsException - if the path exists and no overwrite flag
+ * is set OR if the source is file and
destination is a directory
+ */
+ @Override
+ @Retries.RetryTranslated
+ public Void execute()
+ throws IOException, PathExistsException {
+ LOG.debug("Copying local file from {} to {}", source, destination);
+ File sourceFile = callbacks.pathToFile(source);
+ updateDestStatus(destination);
+
+ // Handles bar/ -> foo/ => foo/bar and bar/ -> foo/bar/ => foo/bar/bar
+ if (getDestStatus().isPresent() && getDestStatus().get().isDirectory()
+ && sourceFile.isDirectory()) {
+ destination = new Path(destination, sourceFile.getName());
+ LOG.debug("Destination updated to: {}", destination);
+ updateDestStatus(destination);
+ }
+
+ checkSource(sourceFile);
+ checkDestination(destination, sourceFile, overwrite);
+ uploadSourceFromFS();
+
+ if (deleteSource) {
+ callbacks.delete(source, true);
+ }
+
+ return null;
+ }
+
+ /**
+ * Does a {@link CopyFromLocalOperationCallbacks#getFileStatus(Path)}
+ * operation on the provided destination and updates the internal status of
+ * destPath property
+ *
+ * @throws IOException if getFileStatus fails
+ */
+ private void updateDestStatus(Path dest) throws IOException {
+ try {
+ destStatus = callbacks.getFileStatus(dest);
+ } catch (FileNotFoundException e) {
+ destStatus = null;
+ }
+ }
+
+ /**
+ * Starts async upload operations for files. Creating an empty directory
+ * classifies as a "file upload".
+ *
+ * Check {@link CopyFromLocalOperation} for details on the order of
+ * operations.
+ *
+ * @throws IOException - if listing or upload fail
+ */
+ private void uploadSourceFromFS() throws IOException {
+ RemoteIterator<LocatedFileStatus> localFiles = listFilesAndDirs(source);
+ List<CompletableFuture<Void>> activeOps = new ArrayList<>();
+
+ // After all files are traversed, this set will contain only emptyDirs
+ Set<Path> emptyDirs = new HashSet<>();
+ List<UploadEntry> entries = new ArrayList<>();
+ while (localFiles.hasNext()) {
+ LocatedFileStatus sourceFile = localFiles.next();
+ Path sourceFilePath = sourceFile.getPath();
+
+ // Directory containing this file / directory isn't empty
+ emptyDirs.remove(sourceFilePath.getParent());
+
+ if (sourceFile.isDirectory()) {
+ emptyDirs.add(sourceFilePath);
+ continue;
+ }
+
+ Path destPath = getFinalPath(sourceFilePath);
+ // UploadEntries: have a destination path, a file size
+ entries.add(new UploadEntry(
+ sourceFilePath,
+ destPath,
+ sourceFile.getLen()));
+ }
+
+ if (localFiles instanceof Closeable) {
+ IOUtils.closeStream((Closeable) localFiles);
+ }
+
+ // Sort all upload entries based on size
+ entries.sort(new ReverseComparator(new UploadEntry.SizeComparator()));
+
+ // Take only top most N entries and upload
+ final int sortedUploadsCount = Math.min(LARGEST_N_FILES, entries.size());
+ List<UploadEntry> markedForUpload = new ArrayList<>();
+
+ for (int uploadNo = 0; uploadNo < sortedUploadsCount; uploadNo++) {
+ UploadEntry uploadEntry = entries.get(uploadNo);
+ File file = callbacks.pathToFile(uploadEntry.source);
+ activeOps.add(submitUpload(file, uploadEntry));
+ markedForUpload.add(uploadEntry);
+ }
+
+ // No files found, it's empty source directory
+ if (entries.isEmpty()) {
+ emptyDirs.add(source);
+ }
+
+ // Shuffle all remaining entries and upload them
+ entries.removeAll(markedForUpload);
+ Collections.shuffle(entries);
+ for (UploadEntry uploadEntry : entries) {
+ File file = callbacks.pathToFile(uploadEntry.source);
+ activeOps.add(submitUpload(file, uploadEntry));
+ }
+
+ for (Path emptyDir : emptyDirs) {
+ Path emptyDirPath = getFinalPath(emptyDir);
+ activeOps.add(submitCreateEmptyDir(emptyDirPath));
+ }
+
+ waitForCompletion(activeOps);
+ }
+
+ /**
+ * Async call to create an empty directory.
+ *
+ * @param dir directory path
+ * @return the submitted future
+ */
+ private CompletableFuture<Void> submitCreateEmptyDir(Path dir) {
+ return submit(executor, callableWithinAuditSpan(
+ getAuditSpan(), () -> {
+ callbacks.createEmptyDir(dir);
+ return null;
+ }
+ ));
+ }
+
+ /**
+ * Async call to upload a file.
+ *
+ * @param file - File to be uploaded
+ * @param uploadEntry - Upload entry holding the source and destination
+ * @return the submitted future
+ */
+ private CompletableFuture<Void> submitUpload(
+ File file,
+ UploadEntry uploadEntry) {
+ return submit(executor, callableWithinAuditSpan(
+ getAuditSpan(), () -> {
+ callbacks.copyLocalFileFromTo(
+ file,
+ uploadEntry.source,
+ uploadEntry.destination);
+ return null;
+ }
+ ));
+ }
+
+ /**
+ * Checks the source before upload starts.
+ *
+ * @param src - Source file
+ * @throws FileNotFoundException - if the file isn't found
+ */
+ private void checkSource(File src)
+ throws FileNotFoundException {
+ if (!src.exists()) {
+ throw new FileNotFoundException("No file: " + src.getPath());
+ }
+ }
+
+ /**
+ * Check the destination path and make sure it's compatible with the source,
+ * i.e. source and destination are both files / directories.
+ *
+ * @param dest - Destination path
+ * @param src - Source file
+ * @param overwrite - Should source overwrite destination
+ * @throws PathExistsException - If the destination path exists and no
+ * overwrite flag is set
+ * @throws FileAlreadyExistsException - If source is file and destination is
path
+ */
+ private void checkDestination(
+ Path dest,
+ File src,
+ boolean overwrite) throws PathExistsException,
+ FileAlreadyExistsException {
+ if (!getDestStatus().isPresent()) {
+ return;
+ }
+
+ if (src.isDirectory() && getDestStatus().get().isFile()) {
+ throw new FileAlreadyExistsException(
+ "Source '" + src.getPath() + "' is directory and " +
+ "destination '" + dest + "' is file");
+ }
+
+ if (!overwrite) {
+ throw new PathExistsException(dest + " already exists");
+ }
+ }
+
+ /**
+ * Get the final path of a source file with regards to its destination.
+ *
+ * @param src - source path
+ * @return - the final path for the source file to be uploaded to
+ * @throws PathIOException - if a relative path can't be created
+ */
+ private Path getFinalPath(Path src) throws PathIOException {
+ URI currentSrcUri = src.toUri();
+ URI relativeSrcUri = source.toUri().relativize(currentSrcUri);
+ if (relativeSrcUri.equals(currentSrcUri)) {
+ throw new PathIOException("Cannot get relative path for URI:"
+ + relativeSrcUri);
+ }
+
+ Optional<FileStatus> status = getDestStatus();
+ if (!relativeSrcUri.getPath().isEmpty()) {
+ return new Path(destination, relativeSrcUri.getPath());
+ } else if (status.isPresent() && status.get().isDirectory()) {
+ // file to dir
+ return new Path(destination, src.getName());
+ } else {
+ // file to file
+ return destination;
+ }
+ }
+
+ private Optional<FileStatus> getDestStatus() {
+ return Optional.ofNullable(destStatus);
+ }
+
+ /**
+ * {@link RemoteIterator} which lists all of the files and directories for
+ * a given path. It's strikingly similar to
+ * {@link org.apache.hadoop.fs.LocalFileSystem#listFiles(Path, boolean)}
+ * however with the small addition that it includes directories.
+ *
+ * @param path - Path to list files and directories from
+ * @return - an iterator
+ * @throws IOException - if listing of a path file fails
+ */
+ private RemoteIterator<LocatedFileStatus> listFilesAndDirs(Path path)
+ throws IOException {
+ return new RemoteIterator<LocatedFileStatus>() {
+ private final Stack<RemoteIterator<LocatedFileStatus>> iterators =
+ new Stack<>();
+ private RemoteIterator<LocatedFileStatus> current =
+ callbacks.listStatusIterator(path);
+ private LocatedFileStatus curFile;
+
+ @Override
+ public boolean hasNext() throws IOException {
+ while (curFile == null) {
+ if (current.hasNext()) {
+ handleFileStat(current.next());
+ } else if (!iterators.empty()) {
+ current = iterators.pop();
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Process the input stat.
+ * If it is a file or directory return the file stat.
+ * If it is a directory, traverse the directory;
+ * @param stat input status
+ * @throws IOException if any IO error occurs
+ */
+ private void handleFileStat(LocatedFileStatus stat)
+ throws IOException {
+ if (stat.isFile()) { // file
+ curFile = stat;
+ } else { // directory
+ curFile = stat;
+ iterators.push(current);
+ current = callbacks.listStatusIterator(stat.getPath());
+ }
+ }
+
+ @Override
+ public LocatedFileStatus next() throws IOException {
+ if (hasNext()) {
+ LocatedFileStatus result = curFile;
+ curFile = null;
+ return result;
+ }
+ throw new java.util.NoSuchElementException("No more entry in "
Review comment:
prefer to import the class
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.CompletableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.io.IOUtils;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
+import static
org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
+
+/**
+ * Implementation of CopyFromLocalOperation.
+ * <p>
+ * This operation copies a file or directory (recursively) from a local
+ * FS to an object store. Initially, this operation has been developed for
+ * S3 (s3a) interaction, however, there's minimal work needed for it to
+ * work with other stores.
+ * </p>
+ * <p>How the uploading of files works:</p>
+ * <ul>
+ * <li> all source files and directories are scanned through;</li>
+ * <li> the LARGEST_N_FILES start uploading; </li>
+ * <li> the remaining files are shuffled and uploaded; </li>
+ * <li>
+ * any remaining empty directory is uploaded too to preserve local
+ * tree structure.
+ * </li>
+ * </ul>
+ */
+public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
+
+ /**
+ * Largest N files to be uploaded first.
+ */
+ private static final int LARGEST_N_FILES = 5;
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ CopyFromLocalOperation.class);
+
+ /**
+ * Callbacks to be used by this operation for external / IO actions
+ */
+ private final CopyFromLocalOperationCallbacks callbacks;
+
+ /**
+ * Delete source after operation finishes
+ */
+ private final boolean deleteSource;
+
+ /**
+ * Overwrite destination files / folders
+ */
+ private final boolean overwrite;
+
+ /**
+ * Source path to file / directory
+ */
+ private final Path source;
+
+ /**
+ * Async operations executor
+ */
+ private final ListeningExecutorService executor;
+
+ /**
+ * Destination path
+ */
+ private Path destination;
+
+ /**
+ * Destination file status
+ */
+ private FileStatus destStatus;
+
+ public CopyFromLocalOperation(
+ final StoreContext storeContext,
+ Path source,
+ Path destination,
+ boolean deleteSource,
+ boolean overwrite,
+ CopyFromLocalOperationCallbacks callbacks) {
+ super(storeContext);
+ this.callbacks = callbacks;
+ this.deleteSource = deleteSource;
+ this.overwrite = overwrite;
+ this.source = source;
+ this.destination = destination;
+
+ // Capacity of 1 is a safe default for now since transfer manager can also
+ // spawn threads when uploading bigger files.
+ this.executor = MoreExecutors.listeningDecorator(
+ storeContext.createThrottledExecutor(1)
+ );
+ }
+
+ /**
+ * Executes the {@link CopyFromLocalOperation}.
+ *
+ * @throws IOException - if there are any failures with upload or
deletion
+ * of files. Check {@link
CopyFromLocalOperationCallbacks} for specifics.
+ * @throws PathExistsException - if the path exists and no overwrite flag
+ * is set OR if the source is file and
destination is a directory
+ */
+ @Override
+ @Retries.RetryTranslated
+ public Void execute()
+ throws IOException, PathExistsException {
+ LOG.debug("Copying local file from {} to {}", source, destination);
+ File sourceFile = callbacks.pathToFile(source);
+ updateDestStatus(destination);
+
+ // Handles bar/ -> foo/ => foo/bar and bar/ -> foo/bar/ => foo/bar/bar
+ if (getDestStatus().isPresent() && getDestStatus().get().isDirectory()
+ && sourceFile.isDirectory()) {
+ destination = new Path(destination, sourceFile.getName());
+ LOG.debug("Destination updated to: {}", destination);
+ updateDestStatus(destination);
+ }
+
+ checkSource(sourceFile);
+ checkDestination(destination, sourceFile, overwrite);
+ uploadSourceFromFS();
+
+ if (deleteSource) {
+ callbacks.delete(source, true);
+ }
+
+ return null;
+ }
+
+ /**
+ * Does a {@link CopyFromLocalOperationCallbacks#getFileStatus(Path)}
+ * operation on the provided destination and updates the internal status of
+ * destPath property
Review comment:
destStatus field.
##########
File path:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -3780,73 +3781,76 @@ private boolean s3Exists(final Path path, final
Set<StatusProbeEnum> probes)
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
- Path dst) throws IOException {
+ Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
- trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst, () -> {
- // innerCopyFromLocalFile(delSrc, overwrite, src, dst);
- super.copyFromLocalFile(delSrc, overwrite, src, dst);
- return null;
- });
+ trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
+ () -> new CopyFromLocalOperation(
+ createStoreContext(),
+ src,
+ dst,
+ delSrc,
+ overwrite,
+ createCopyFromLocalCallbacks()).execute());
}
- /**
- * The src file is on the local disk. Add it to FS at
- * the given dst name.
- *
- * This version doesn't need to create a temporary file to calculate the md5.
- * Sadly this doesn't seem to be used by the shell cp :(
- *
- * <i>HADOOP-15932:</i> this method has been unwired from
- * {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until
- * it is extended to list and copy whole directories.
- * delSrc indicates if the source should be removed
- * @param delSrc whether to delete the src
- * @param overwrite whether to overwrite an existing file
- * @param src Source path: must be on local filesystem
- * @param dst path
- * @throws IOException IO problem
- * @throws FileAlreadyExistsException the destination file exists and
- * overwrite==false, or if the destination is a directory.
- * @throws FileNotFoundException if the source file does not exit
- * @throws AmazonClientException failure in the AWS SDK
- * @throws IllegalArgumentException if the source path is not on the local FS
- */
- @Retries.RetryTranslated
- private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
- Path src, Path dst)
- throws IOException, FileAlreadyExistsException, AmazonClientException {
- LOG.debug("Copying local file from {} to {}", src, dst);
-
- // Since we have a local file, we don't need to stream into a temporary
file
+ protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
+ createCopyFromLocalCallbacks() throws IOException {
LocalFileSystem local = getLocal(getConf());
- File srcfile = local.pathToFile(src);
- if (!srcfile.exists()) {
- throw new FileNotFoundException("No file: " + src);
+ return new CopyFromLocalCallbacksImpl(local);
+ }
+
+ protected class CopyFromLocalCallbacksImpl implements
+ CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
+ private final LocalFileSystem local;
+
+ private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
+ this.local = local;
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listStatusIterator(
Review comment:
prefer `listLocalStatusIterator()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]