mehakmeet commented on a change in pull request #3101:
URL: https://github.com/apache/hadoop/pull/3101#discussion_r668765888



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.io.IOUtils;
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
+import static 
org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
+
+/**
+ * <p>Implementation of CopyFromLocalOperation</p>
+ * <p>
+ *     This operation copies a file or directory (recursively) from a local
+ *     FS to an object store. Initially, this operation has been developed for
+ *     S3 (s3a) interaction, however, there's minimal work needed for it to
+ *     work with other stores.
+ * </p>
+ * <p>How the uploading of files works:</p>
+ * <ul>
+ *     <li> all source files and directories are scanned through;</li>
+ *     <li> the LARGEST_N_FILES start uploading; </li>
+ *     <li> the remaining files are shuffled and uploaded; </li>
+ *     <li>
+ *         any remaining empty directory is uploaded too to preserve local
+ *         tree structure.
+ *     </li>
+ * </ul>
+ */
+public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
+
+  /**
+   * Largest N files to be uploaded first
+   */
+  private static final int LARGEST_N_FILES = 5;
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CopyFromLocalOperation.class);
+
+  /**
+   * Callbacks to be used by this operation for external / IO actions
+   */
+  private final CopyFromLocalOperationCallbacks callbacks;
+
+  /**
+   * Delete source after operation finishes
+   */
+  private final boolean deleteSource;
+
+  /**
+   * Overwrite destination files / folders
+   */
+  private final boolean overwrite;
+
+  /**
+   * Source path to file / directory
+   */
+  private final Path source;
+
+  /**
+   * Async operations executor
+   */
+  private final ListeningExecutorService executor;
+
+  /**
+   * Destination path
+   */
+  private Path destination;
+
+  /**
+   * Destination file status
+   */
+  private FileStatus destStatus;
+
+  public CopyFromLocalOperation(
+      final StoreContext storeContext,
+      Path source,
+      Path destination,
+      boolean deleteSource,
+      boolean overwrite,
+      CopyFromLocalOperationCallbacks callbacks) {
+    super(storeContext);
+    this.callbacks = callbacks;
+    this.deleteSource = deleteSource;
+    this.overwrite = overwrite;
+    this.source = source;
+    this.destination = destination;
+    this.executor = MoreExecutors.listeningDecorator(
+        storeContext.createThrottledExecutor(1)

Review comment:
       why the capacity of executor is 1? Maybe a comment to explain that as 
well. 

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);

Review comment:
       The message should indicate what went wrong rather than what is being 
performed, like "uploaded file not found" or src + " copied, not found in 
destination".  

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,
+        message.getBytes(ASCII).length, status.getLen());
+    assertFileTextEquals(dest, message);
+  }
+
+  @Test
+  public void testCopyFileNoOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    copyFromLocal(file, true);
+    intercept(PathExistsException.class,
+        () -> copyFromLocal(file, false));
+  }
+
+  @Test
+  public void testCopyFileOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    Path dest = copyFromLocal(file, true);
+    String updated = "updated";
+    FileUtils.write(file, updated, ASCII);
+    copyFromLocal(file, true);
+    assertFileTextEquals(dest, updated);
+  }
+
+  @Test
+  public void testCopyMissingFile() throws Throwable {
+    describe("Copying a file that's not there should fail.");
+    file = createTempFile("test");
+    file.delete();
+    // first upload to create
+    intercept(FileNotFoundException.class, "",
+        () -> copyFromLocal(file, true));
+  }
+
+  @Test
+  public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+    describe("Source is a file delSrc flag is set to true");
+
+    file = createTempFile("test");
+    copyFromLocal(file, false, true);
+
+    assertFalse("uploaded file", Files.exists(file.toPath()));
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+    describe("Source is a file and destination is a directory. File" +
+        "should be copied inside the directory.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+    mkdirs(destination);
+
+    fs.copyFromLocalFile(source, destination);
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsNonExistentDirectory()
+      throws Throwable {
+    describe("Source is a file and destination directory does not exist. " +
+        "Copy operation should still work.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+
+    fs.copyFromLocalFile(source, destination);
+    assertPathExists("Destination should exist.", destination);
+  }
+
+  @Test
+  public void testSrcIsDirWithFilesAndCopySuccessful() throws Throwable {
+    describe("Source is a directory with files, copy should copy all" +
+        " dir contents to source");
+    String firstChild = "childOne";
+    String secondChild = "childTwo";
+    File parent = createTempDirectory("parent");
+    File root = parent.getParentFile();
+    File childFile = createTempFile(parent, firstChild, firstChild);
+    File secondChildFile = createTempFile(parent, secondChild, secondChild);
+
+    copyFromLocal(parent, false);
+
+    assertPathExists("Parent directory not copied", fileToPath(parent));
+    assertFileTextEquals(fileToPath(childFile, root), firstChild);
+    assertFileTextEquals(fileToPath(secondChildFile, root), secondChild);
+  }
+
+  @Test
+  public void testSrcIsEmptyDirWithCopySuccessful() throws Throwable {
+    describe("Source is an empty directory, copy should succeed");
+    File source = createTempDirectory("source");
+    Path dest = copyFromLocal(source, false);
+
+    assertPathExists("Empty directory not copied", dest);
+  }
+
+  @Test
+  public void testSrcIsDirWithOverwriteOptions() throws Throwable {
+    describe("Source is a directory, destination exists and" +
+        "should be overwritten.");
+    // Disabling checksum because overwriting directories does not
+    // overwrite checksums
+    FileSystem fs = getFileSystem();
+    fs.setVerifyChecksum(false);
+
+    File source = createTempDirectory("source");
+    Path sourcePath = new Path(source.toURI());
+    String contents = "test file";
+    File child = createTempFile(source, "child", contents);
+
+    Path dest = path(source.getName()).getParent();
+    fs.copyFromLocalFile(sourcePath, dest);
+    intercept(PathExistsException.class,
+        () -> fs.copyFromLocalFile(false, false,
+            sourcePath, dest));
+
+    String updated = "updated contents";
+    FileUtils.write(child, updated, ASCII);
+    fs.copyFromLocalFile(false, true, sourcePath, dest);
+
+    assertPathExists("Parent directory not copied", fileToPath(source));
+    assertFileTextEquals(fileToPath(child, source.getParentFile()),
+        updated);
+    getFileSystem().setVerifyChecksum(true);
+  }
+
+  @Test
+  public void testSrcIsDirWithDelSrcOptions() throws Throwable {
+    describe("Source is a directory, destination exists and" +
+        "should be overwritten.");
+    File source = createTempDirectory("source");
+    String contents = "child file";
+    File child = createTempFile(source, "child", contents);
+
+    copyFromLocal(source, false, true);
+    Path dest = fileToPath(child, source.getParentFile());
+
+    assertFalse("Directory not deleted", Files.exists(source.toPath()));
+    assertFileTextEquals(dest, contents);
+  }
+
+  /*
+   * The following path is being created on disk and copied over
+   * /parent/ (directory)
+   * /parent/test1.txt
+   * /parent/child/test.txt
+   * /parent/secondChild/ (directory)
+   */
+  @Test
+  public void testCopyTreeDirectoryWithoutDelete() throws Throwable {
+    File srcDir = createTempDirectory("parent");
+    File childDir = createTempDirectory(srcDir, "child");
+    File secondChild = createTempDirectory(srcDir, "secondChild");
+    File parentFile = createTempFile(srcDir, "test1", ".txt");
+    File childFile = createTempFile(childDir, "test2", ".txt");
+
+    copyFromLocal(srcDir, false, false);
+    File root = srcDir.getParentFile();
+
+    assertPathExists("Parent directory", fileToPath(srcDir));

Review comment:
       Add " not found" in these error messages. 

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,
+        message.getBytes(ASCII).length, status.getLen());
+    assertFileTextEquals(dest, message);
+  }
+
+  @Test
+  public void testCopyFileNoOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    copyFromLocal(file, true);
+    intercept(PathExistsException.class,
+        () -> copyFromLocal(file, false));
+  }
+
+  @Test
+  public void testCopyFileOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    Path dest = copyFromLocal(file, true);
+    String updated = "updated";
+    FileUtils.write(file, updated, ASCII);
+    copyFromLocal(file, true);
+    assertFileTextEquals(dest, updated);
+  }
+
+  @Test
+  public void testCopyMissingFile() throws Throwable {
+    describe("Copying a file that's not there should fail.");
+    file = createTempFile("test");
+    file.delete();
+    // first upload to create
+    intercept(FileNotFoundException.class, "",
+        () -> copyFromLocal(file, true));
+  }
+
+  @Test
+  public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+    describe("Source is a file delSrc flag is set to true");
+
+    file = createTempFile("test");
+    copyFromLocal(file, false, true);
+
+    assertFalse("uploaded file", Files.exists(file.toPath()));
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+    describe("Source is a file and destination is a directory. File" +
+        "should be copied inside the directory.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+    mkdirs(destination);
+
+    fs.copyFromLocalFile(source, destination);
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsNonExistentDirectory()
+      throws Throwable {
+    describe("Source is a file and destination directory does not exist. " +
+        "Copy operation should still work.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+
+    fs.copyFromLocalFile(source, destination);
+    assertPathExists("Destination should exist.", destination);
+  }
+
+  @Test
+  public void testSrcIsDirWithFilesAndCopySuccessful() throws Throwable {
+    describe("Source is a directory with files, copy should copy all" +
+        " dir contents to source");
+    String firstChild = "childOne";
+    String secondChild = "childTwo";
+    File parent = createTempDirectory("parent");
+    File root = parent.getParentFile();
+    File childFile = createTempFile(parent, firstChild, firstChild);
+    File secondChildFile = createTempFile(parent, secondChild, secondChild);
+
+    copyFromLocal(parent, false);
+
+    assertPathExists("Parent directory not copied", fileToPath(parent));
+    assertFileTextEquals(fileToPath(childFile, root), firstChild);
+    assertFileTextEquals(fileToPath(secondChildFile, root), secondChild);
+  }
+
+  @Test
+  public void testSrcIsEmptyDirWithCopySuccessful() throws Throwable {
+    describe("Source is an empty directory, copy should succeed");
+    File source = createTempDirectory("source");
+    Path dest = copyFromLocal(source, false);
+
+    assertPathExists("Empty directory not copied", dest);
+  }
+
+  @Test
+  public void testSrcIsDirWithOverwriteOptions() throws Throwable {
+    describe("Source is a directory, destination exists and" +
+        "should be overwritten.");
+    // Disabling checksum because overwriting directories does not
+    // overwrite checksums
+    FileSystem fs = getFileSystem();
+    fs.setVerifyChecksum(false);
+
+    File source = createTempDirectory("source");
+    Path sourcePath = new Path(source.toURI());
+    String contents = "test file";
+    File child = createTempFile(source, "child", contents);
+
+    Path dest = path(source.getName()).getParent();
+    fs.copyFromLocalFile(sourcePath, dest);
+    intercept(PathExistsException.class,
+        () -> fs.copyFromLocalFile(false, false,
+            sourcePath, dest));
+
+    String updated = "updated contents";
+    FileUtils.write(child, updated, ASCII);
+    fs.copyFromLocalFile(false, true, sourcePath, dest);
+
+    assertPathExists("Parent directory not copied", fileToPath(source));
+    assertFileTextEquals(fileToPath(child, source.getParentFile()),
+        updated);
+    getFileSystem().setVerifyChecksum(true);
+  }
+
+  @Test
+  public void testSrcIsDirWithDelSrcOptions() throws Throwable {
+    describe("Source is a directory, destination exists and" +
+        "should be overwritten.");

Review comment:
       change the description. 

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,

Review comment:
       Maybe "Mismatch in file length" or "File length not equal". 

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CopyFromLocalOperation.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.io.IOUtils;
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
+import static 
org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinAuditSpan;
+
+/**
+ * <p>Implementation of CopyFromLocalOperation</p>
+ * <p>
+ *     This operation copies a file or directory (recursively) from a local
+ *     FS to an object store. Initially, this operation has been developed for
+ *     S3 (s3a) interaction, however, there's minimal work needed for it to
+ *     work with other stores.
+ * </p>
+ * <p>How the uploading of files works:</p>
+ * <ul>
+ *     <li> all source files and directories are scanned through;</li>
+ *     <li> the LARGEST_N_FILES start uploading; </li>
+ *     <li> the remaining files are shuffled and uploaded; </li>
+ *     <li>
+ *         any remaining empty directory is uploaded too to preserve local
+ *         tree structure.
+ *     </li>
+ * </ul>
+ */
+public class CopyFromLocalOperation extends ExecutingStoreOperation<Void> {
+
+  /**
+   * Largest N files to be uploaded first
+   */
+  private static final int LARGEST_N_FILES = 5;

Review comment:
       any reason for this to be 5?

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,
+        message.getBytes(ASCII).length, status.getLen());
+    assertFileTextEquals(dest, message);
+  }
+
+  @Test
+  public void testCopyFileNoOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    copyFromLocal(file, true);
+    intercept(PathExistsException.class,
+        () -> copyFromLocal(file, false));
+  }
+
+  @Test
+  public void testCopyFileOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    Path dest = copyFromLocal(file, true);
+    String updated = "updated";
+    FileUtils.write(file, updated, ASCII);
+    copyFromLocal(file, true);
+    assertFileTextEquals(dest, updated);
+  }
+
+  @Test
+  public void testCopyMissingFile() throws Throwable {
+    describe("Copying a file that's not there should fail.");
+    file = createTempFile("test");
+    file.delete();
+    // first upload to create
+    intercept(FileNotFoundException.class, "",
+        () -> copyFromLocal(file, true));
+  }
+
+  @Test
+  public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+    describe("Source is a file delSrc flag is set to true");
+
+    file = createTempFile("test");
+    copyFromLocal(file, false, true);
+
+    assertFalse("uploaded file", Files.exists(file.toPath()));

Review comment:
       "Source file not deleted" or "Source file shouldn't exist". 

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,
+        message.getBytes(ASCII).length, status.getLen());
+    assertFileTextEquals(dest, message);
+  }
+
+  @Test
+  public void testCopyFileNoOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    copyFromLocal(file, true);
+    intercept(PathExistsException.class,
+        () -> copyFromLocal(file, false));
+  }
+
+  @Test
+  public void testCopyFileOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    Path dest = copyFromLocal(file, true);
+    String updated = "updated";
+    FileUtils.write(file, updated, ASCII);
+    copyFromLocal(file, true);
+    assertFileTextEquals(dest, updated);
+  }
+
+  @Test
+  public void testCopyMissingFile() throws Throwable {
+    describe("Copying a file that's not there should fail.");
+    file = createTempFile("test");
+    file.delete();
+    // first upload to create
+    intercept(FileNotFoundException.class, "",
+        () -> copyFromLocal(file, true));
+  }
+
+  @Test
+  public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+    describe("Source is a file delSrc flag is set to true");
+
+    file = createTempFile("test");
+    copyFromLocal(file, false, true);
+
+    assertFalse("uploaded file", Files.exists(file.toPath()));
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+    describe("Source is a file and destination is a directory. File" +
+        "should be copied inside the directory.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+    mkdirs(destination);
+
+    fs.copyFromLocalFile(source, destination);
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsNonExistentDirectory()
+      throws Throwable {
+    describe("Source is a file and destination directory does not exist. " +
+        "Copy operation should still work.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+

Review comment:
       Assert that "destination" doesn't exist. 

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,
+        message.getBytes(ASCII).length, status.getLen());
+    assertFileTextEquals(dest, message);
+  }
+
+  @Test
+  public void testCopyFileNoOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    copyFromLocal(file, true);
+    intercept(PathExistsException.class,
+        () -> copyFromLocal(file, false));
+  }
+
+  @Test
+  public void testCopyFileOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    Path dest = copyFromLocal(file, true);
+    String updated = "updated";
+    FileUtils.write(file, updated, ASCII);
+    copyFromLocal(file, true);
+    assertFileTextEquals(dest, updated);
+  }
+
+  @Test
+  public void testCopyMissingFile() throws Throwable {
+    describe("Copying a file that's not there should fail.");
+    file = createTempFile("test");
+    file.delete();
+    // first upload to create
+    intercept(FileNotFoundException.class, "",
+        () -> copyFromLocal(file, true));
+  }
+
+  @Test
+  public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+    describe("Source is a file delSrc flag is set to true");
+
+    file = createTempFile("test");
+    copyFromLocal(file, false, true);
+
+    assertFalse("uploaded file", Files.exists(file.toPath()));
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+    describe("Source is a file and destination is a directory. File" +
+        "should be copied inside the directory.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+    mkdirs(destination);
+
+    fs.copyFromLocalFile(source, destination);
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsNonExistentDirectory()
+      throws Throwable {
+    describe("Source is a file and destination directory does not exist. " +
+        "Copy operation should still work.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+
+    fs.copyFromLocalFile(source, destination);
+    assertPathExists("Destination should exist.", destination);
+  }
+
+  @Test
+  public void testSrcIsDirWithFilesAndCopySuccessful() throws Throwable {
+    describe("Source is a directory with files, copy should copy all" +
+        " dir contents to source");

Review comment:
       nit: "to destination". 

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,
+        message.getBytes(ASCII).length, status.getLen());
+    assertFileTextEquals(dest, message);
+  }
+
+  @Test
+  public void testCopyFileNoOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    copyFromLocal(file, true);
+    intercept(PathExistsException.class,
+        () -> copyFromLocal(file, false));
+  }
+
+  @Test
+  public void testCopyFileOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    Path dest = copyFromLocal(file, true);
+    String updated = "updated";
+    FileUtils.write(file, updated, ASCII);
+    copyFromLocal(file, true);
+    assertFileTextEquals(dest, updated);
+  }
+
+  @Test
+  public void testCopyMissingFile() throws Throwable {
+    describe("Copying a file that's not there should fail.");
+    file = createTempFile("test");
+    file.delete();
+    // first upload to create
+    intercept(FileNotFoundException.class, "",
+        () -> copyFromLocal(file, true));
+  }
+
+  @Test
+  public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+    describe("Source is a file delSrc flag is set to true");
+
+    file = createTempFile("test");
+    copyFromLocal(file, false, true);
+
+    assertFalse("uploaded file", Files.exists(file.toPath()));
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+    describe("Source is a file and destination is a directory. File" +
+        "should be copied inside the directory.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);
+    mkdirs(destination);
+
+    fs.copyFromLocalFile(source, destination);

Review comment:
       Mabe a file exists assertion here. 

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractCopyFromLocalTest.java
##########
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.contract;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractCopyFromLocalTest extends
+    AbstractFSContractTestBase {
+
+  private static final Charset ASCII = StandardCharsets.US_ASCII;
+  private File file;
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    if (file != null) {
+      file.delete();
+    }
+  }
+
+  @Test
+  public void testCopyEmptyFile() throws Throwable {
+    file = File.createTempFile("test", ".txt");
+    Path dest = copyFromLocal(file, true);
+    assertPathExists("uploaded file", dest);
+  }
+
+  @Test
+  public void testCopyFile() throws Throwable {
+    String message = "hello";
+    file = createTempFile(message);
+    Path dest = copyFromLocal(file, true);
+
+    assertPathExists("uploaded file not found", dest);
+    assertTrue("source file deleted", Files.exists(file.toPath()));
+
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(dest);
+    assertEquals("File length of " + status,
+        message.getBytes(ASCII).length, status.getLen());
+    assertFileTextEquals(dest, message);
+  }
+
+  @Test
+  public void testCopyFileNoOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    copyFromLocal(file, true);
+    intercept(PathExistsException.class,
+        () -> copyFromLocal(file, false));
+  }
+
+  @Test
+  public void testCopyFileOverwrite() throws Throwable {
+    file = createTempFile("hello");
+    Path dest = copyFromLocal(file, true);
+    String updated = "updated";
+    FileUtils.write(file, updated, ASCII);
+    copyFromLocal(file, true);
+    assertFileTextEquals(dest, updated);
+  }
+
+  @Test
+  public void testCopyMissingFile() throws Throwable {
+    describe("Copying a file that's not there should fail.");
+    file = createTempFile("test");
+    file.delete();
+    // first upload to create
+    intercept(FileNotFoundException.class, "",
+        () -> copyFromLocal(file, true));
+  }
+
+  @Test
+  public void testSourceIsFileAndDelSrcTrue() throws Throwable {
+    describe("Source is a file delSrc flag is set to true");
+
+    file = createTempFile("test");
+    copyFromLocal(file, false, true);
+
+    assertFalse("uploaded file", Files.exists(file.toPath()));
+  }
+
+  @Test
+  public void testSourceIsFileAndDestinationIsDirectory() throws Throwable {
+    describe("Source is a file and destination is a directory. File" +
+        "should be copied inside the directory.");
+
+    file = createTempFile("test");
+    Path source = new Path(file.toURI());
+    FileSystem fs = getFileSystem();
+
+    File dir = createTempDirectory("test");
+    Path destination = fileToPath(dir);
+    fs.delete(destination, false);

Review comment:
       why are we deleting the directory here? Maybe add some comments for this 
test to understand properly for certain steps. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to