bkietz commented on a change in pull request #8187:
URL: https://github.com/apache/arrow/pull/8187#discussion_r494388617



##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -457,18 +458,52 @@ Status CopyFiles(const std::vector<FileLocator>& sources,
         ARROW_ASSIGN_OR_RAISE(auto source,
                               
sources[i].filesystem->OpenInputStream(sources[i].path));
 
-        auto dest_dir = 
internal::GetAbstractPathParent(destinations[i].path).first;
-        if (!dest_dir.empty()) {
-          RETURN_NOT_OK(destinations[i].filesystem->CreateDir(dest_dir));
-        }
-
         ARROW_ASSIGN_OR_RAISE(
             auto destination,
             
destinations[i].filesystem->OpenOutputStream(destinations[i].path));
         return internal::CopyStream(source, destination, chunk_size);
       });
 }
 
+Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs,
+                 const FileSelector& source_sel,
+                 const std::shared_ptr<FileSystem>& destination_fs,
+                 const std::string& destination_base_dir, int64_t chunk_size,
+                 bool use_threads) {
+  ARROW_ASSIGN_OR_RAISE(auto source_infos, source_fs->GetFileInfo(source_sel));
+  std::vector<FileLocator> sources, destinations;
+
+  std::unordered_set<std::string> destination_dirs;
+  destination_dirs.insert(destination_base_dir);
+
+  for (const FileInfo& source_info : source_infos) {
+    auto relative = internal::RemoveAncestor(source_sel.base_dir, 
source_info.path());
+    if (!relative.has_value()) {
+      return Status::Invalid("GetFileInfo() yielded path '", 
source_info.path(),
+                             "', which is outside base dir '", 
source_sel.base_dir, "'");
+    }
+
+    auto destination_path =
+        internal::ConcatAbstractPath(destination_base_dir, 
relative->to_string());
+
+    if (source_info.IsDirectory()) {
+      destination_dirs.insert(destination_path);
+    } else if (source_info.IsFile()) {
+      sources.push_back({source_fs, source_info.path()});
+      destinations.push_back({destination_fs, destination_path});
+    }
+  }
+
+  std::vector<std::string> dirs(destination_dirs.size());
+  std::move(destination_dirs.begin(), destination_dirs.end(), dirs.begin());
+  RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
+      use_threads, static_cast<int>(dirs.size()), [&](int i) {
+        return dirs[i].empty() ? Status::OK() : 
destination_fs->CreateDir(dirs[i]);

Review comment:
       I'll remove any directories which have a descendant in the set, that 
should produce the minimal number of directories to create.

##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -487,15 +489,27 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& 
source_fs,
         internal::ConcatAbstractPath(destination_base_dir, 
relative->to_string());
 
     if (source_info.IsDirectory()) {
-      destination_dirs.insert(destination_path);
+      dirs.push_back(destination_path);
     } else if (source_info.IsFile()) {
       sources.push_back({source_fs, source_info.path()});
       destinations.push_back({destination_fs, destination_path});
     }
   }
 
-  std::vector<std::string> dirs(destination_dirs.size());
-  std::move(destination_dirs.begin(), destination_dirs.end(), dirs.begin());
+  // remove directories with descendants since recursive directory creation 
will create
+  // them automatically
+  std::sort(dirs.begin(), dirs.end());
+  for (auto ancestor = dirs.begin(); ancestor != dirs.end(); ++ancestor) {
+    auto descendants_end = ancestor + 1;
+
+    while (descendants_end != dirs.end() &&
+           internal::IsAncestorOf(*ancestor, *descendants_end)) {
+      ++descendants_end;
+    }
+
+    dirs.erase(ancestor, descendants_end - 1);

Review comment:
       no, `dirs.erase(ancestor, descendants_end)` would erase "a/b" too but I 
subtract 1 so it leaves the last descendant in place

##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -487,15 +489,27 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& 
source_fs,
         internal::ConcatAbstractPath(destination_base_dir, 
relative->to_string());
 
     if (source_info.IsDirectory()) {
-      destination_dirs.insert(destination_path);
+      dirs.push_back(destination_path);
     } else if (source_info.IsFile()) {
       sources.push_back({source_fs, source_info.path()});
       destinations.push_back({destination_fs, destination_path});
     }
   }
 
-  std::vector<std::string> dirs(destination_dirs.size());
-  std::move(destination_dirs.begin(), destination_dirs.end(), dirs.begin());
+  // remove directories with descendants since recursive directory creation 
will create
+  // them automatically
+  std::sort(dirs.begin(), dirs.end());
+  for (auto ancestor = dirs.begin(); ancestor != dirs.end(); ++ancestor) {
+    auto descendants_end = ancestor + 1;
+
+    while (descendants_end != dirs.end() &&
+           internal::IsAncestorOf(*ancestor, *descendants_end)) {
+      ++descendants_end;
+    }
+
+    dirs.erase(ancestor, descendants_end - 1);

Review comment:
       > Perhaps `PathForest` can help. Though it's not obvious how to list the 
leaves.
   I don't think so; the intent of PathForest is to visit all ancestors 
efficiently so getting a minimal set of descendants would not be efficient
   
   > `internal::IsAncestorOf(*(descendants_end - 1), *descendants_end)`
   You're right, thanks
   
   > Another problem is that you're iterating on dirs while erasing from it. 
That's not safe, I think, unless you write `ancestor = dirs.erase(ancestor, 
descendants_end - 1)`.
   You're right; vector::erase invalidates iterators *at* or after the 
beginning of the range.
   
   > Yet another issue is if you have {"a", "a/b/c"}, "a" won't be removed?
   No, I'd expect `IsAncestorOf("a", "a/b/c") so "a" should be removed.

##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -487,15 +489,27 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& 
source_fs,
         internal::ConcatAbstractPath(destination_base_dir, 
relative->to_string());
 
     if (source_info.IsDirectory()) {
-      destination_dirs.insert(destination_path);
+      dirs.push_back(destination_path);
     } else if (source_info.IsFile()) {
       sources.push_back({source_fs, source_info.path()});
       destinations.push_back({destination_fs, destination_path});
     }
   }
 
-  std::vector<std::string> dirs(destination_dirs.size());
-  std::move(destination_dirs.begin(), destination_dirs.end(), dirs.begin());
+  // remove directories with descendants since recursive directory creation 
will create
+  // them automatically
+  std::sort(dirs.begin(), dirs.end());
+  for (auto ancestor = dirs.begin(); ancestor != dirs.end(); ++ancestor) {
+    auto descendants_end = ancestor + 1;
+
+    while (descendants_end != dirs.end() &&
+           internal::IsAncestorOf(*ancestor, *descendants_end)) {
+      ++descendants_end;
+    }
+
+    dirs.erase(ancestor, descendants_end - 1);

Review comment:
       > Perhaps `PathForest` can help. Though it's not obvious how to list the 
leaves.
   
   I don't think so; the intent of PathForest is to visit all ancestors 
efficiently so getting a minimal set of descendants would not be efficient
   
   > `internal::IsAncestorOf(*(descendants_end - 1), *descendants_end)`
   
   You're right, thanks
   
   > Another problem is that you're iterating on dirs while erasing from it. 
That's not safe, I think, unless you write `ancestor = dirs.erase(ancestor, 
descendants_end - 1)`.
   
   You're right; vector::erase invalidates iterators *at* or after the 
beginning of the range.
   
   > Yet another issue is if you have {"a", "a/b/c"}, "a" won't be removed?
   
   No, I'd expect `IsAncestorOf("a", "a/b/c") so "a" should be removed.

##########
File path: cpp/src/arrow/filesystem/filesystem.cc
##########
@@ -487,15 +489,27 @@ Status CopyFiles(const std::shared_ptr<FileSystem>& 
source_fs,
         internal::ConcatAbstractPath(destination_base_dir, 
relative->to_string());
 
     if (source_info.IsDirectory()) {
-      destination_dirs.insert(destination_path);
+      dirs.push_back(destination_path);
     } else if (source_info.IsFile()) {
       sources.push_back({source_fs, source_info.path()});
       destinations.push_back({destination_fs, destination_path});
     }
   }
 
-  std::vector<std::string> dirs(destination_dirs.size());
-  std::move(destination_dirs.begin(), destination_dirs.end(), dirs.begin());
+  // remove directories with descendants since recursive directory creation 
will create
+  // them automatically
+  std::sort(dirs.begin(), dirs.end());
+  for (auto ancestor = dirs.begin(); ancestor != dirs.end(); ++ancestor) {
+    auto descendants_end = ancestor + 1;
+
+    while (descendants_end != dirs.end() &&
+           internal::IsAncestorOf(*ancestor, *descendants_end)) {
+      ++descendants_end;
+    }
+
+    dirs.erase(ancestor, descendants_end - 1);

Review comment:
       > Perhaps `PathForest` can help. Though it's not obvious how to list the 
leaves.
   
   I don't think so; the intent of PathForest is to visit all ancestors 
efficiently so getting a minimal set of descendants would not be efficient
   
   > `internal::IsAncestorOf(*(descendants_end - 1), *descendants_end)`
   
   You're right, thanks
   
   > Another problem is that you're iterating on dirs while erasing from it. 
That's not safe, I think, unless you write `ancestor = dirs.erase(ancestor, 
descendants_end - 1)`.
   
   You're right; vector::erase invalidates iterators *at* or after the 
beginning of the range.
   
   > Yet another issue is if you have {"a", "a/b/c"}, "a" won't be removed?
   
   No, I'd expect `IsAncestorOf("a", "a/b/c")` so "a" should be removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to