emkornfield commented on a change in pull request #11996:
URL: https://github.com/apache/arrow/pull/11996#discussion_r779945233



##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -310,93 +318,107 @@ class GcsFileSystem::Impl {
   Result<FileInfo> GetFileInfo(const GcsPath& path) {
     if (path.object.empty()) {
       auto meta = client_.GetBucketMetadata(path.bucket);
-      return GetFileInfoDirectory(path, std::move(meta).status());
+      return GetFileInfoBucket(path, std::move(meta).status());
     }
     auto meta = client_.GetObjectMetadata(path.bucket, path.object);
-    if (path.object.back() == '/') {
-      return GetFileInfoDirectory(path, std::move(meta).status());
-    }
-    return GetFileInfoFile(path, meta);
+    return GetFileInfoObject(path, meta);
   }
 
   Result<FileInfoVector> GetFileInfo(const FileSelector& select) {
     ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir));
+    // Adding the trailing '/' avoids problems with files named 'a', 'ab', 
'ac' which
+    // where GCS would return all of them if the prefix is 'a'.
     const auto canonical = internal::EnsureTrailingSlash(p.object);
     const auto max_depth = internal::Depth(canonical) + select.max_recursion;
     auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
     auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/");
-    bool found_directory = false;
     FileInfoVector result;
     for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) {
       if (!o.ok()) {
         if (select.allow_not_found &&
             o.status().code() == google::cloud::StatusCode::kNotFound) {
-          continue;
+          return result;
         }
         return internal::ToArrowStatus(o.status());
       }
-      found_directory = true;
       // Skip the directory itself from the results, and any result that is 
"too deep"
       // into the recursion.
       if (o->name() == p.object || internal::Depth(o->name()) > max_depth) {
         continue;
       }
       auto path = internal::ConcatAbstractPath(o->bucket(), o->name());
-      if (o->name().back() == '/') {
-        result.push_back(
-            FileInfo(internal::EnsureTrailingSlash(path), 
FileType::Directory));
-        continue;
-      }
       result.push_back(ToFileInfo(path, *o));
     }
-    if (!found_directory && !select.allow_not_found) {
-      return Status::IOError("No such file or directory '", select.base_dir, 
"'");
+    // Finding any elements indicates the directory was found.
+    if (!result.empty() || select.allow_not_found) {
+      return result;
     }
-    return result;
+    // To find out if the directory exists we need to perform an additional 
query.
+    ARROW_ASSIGN_OR_RAISE(auto directory, GetFileInfo(p));
+    if (directory.IsDirectory()) return result;
+    return Status::IOError("No such file or directory '", select.base_dir, 
"'");
   }
 
   // GCS does not have directories or folders. But folders can be emulated 
(with some
   // limitations) using marker objects.  That and listing with prefixes 
creates the
   // illusion of folders.
-  google::cloud::Status CreateDirMarker(const std::string& bucket,
-                                        util::string_view name) {
+  google::cloud::StatusOr<gcs::ObjectMetadata> CreateDirMarker(const 
std::string& bucket,
+                                                               
util::string_view name) {
     // Make the name canonical.
-    const auto canonical = internal::EnsureTrailingSlash(name);
-    return client_
-        .InsertObject(bucket, canonical, std::string(),
-                      
gcs::WithObjectMetadata(gcs::ObjectMetadata().upsert_metadata(
-                          "arrow/gcsfs", "directory")))
-        .status();
+    const auto canonical = internal::RemoveTrailingSlash(name).to_string();
+    auto object = client_.InsertObject(
+        bucket, canonical, std::string(),
+        gcs::WithObjectMetadata(
+            gcs::ObjectMetadata().upsert_metadata("arrow/gcsfs", "directory")),
+        gcs::IfGenerationMatch(0));
+    if (object) return object;
+    if (object.status().code() == GcsCode::kFailedPrecondition) {
+      // The marker already exists, find out if it is a directory or a file.
+      return client_.GetObjectMetadata(bucket, canonical);
+    }
+    return object;
   }
 
-  google::cloud::Status CreateDirMarkerRecursive(const std::string& bucket,
-                                                 const std::string& object) {
-    using GcsCode = google::cloud::StatusCode;
+  static Status NotDirectoryError(const gcs::ObjectMetadata& o) {
+    return Status::IOError(
+        "Cannot create directory, it conflicts with an existing file '",
+        internal::ConcatAbstractPath(o.bucket(), o.name()), "'");
+  }
+
+  Status CreateDirMarkerRecursive(const std::string& bucket, const 
std::string& name) {
     auto get_parent = [](std::string const& path) {
       return std::move(internal::GetAbstractPathParent(path).first);
     };
-    // Maybe counterintuitively we create the markers from the most nested and 
up. Because
-    // GCS does not have directories creating `a/b/c` will succeed, even if 
`a/` or `a/b/`
-    // does not exist.  In the common case, where `a/b/` may already exist, it 
is more
-    // efficient to just create `a/b/c/` and then find out that `a/b/` was 
already there.
-    // In the case where none exists, it does not matter which order we follow.
-    auto status = CreateDirMarker(bucket, object);
-    if (status.code() == GcsCode::kAlreadyExists) return {};
-    if (status.code() == GcsCode::kNotFound) {
-      // Missing bucket, create it first ...
-      status = client_.CreateBucket(bucket, gcs::BucketMetadata()).status();
-      if (status.code() != GcsCode::kOk && status.code() != 
GcsCode::kAlreadyExists) {
-        return status;
+    // Find the list of missing parents. In the process we discover if any 
elements in
+    // the path are files, this is unavoidable as GCS does not really have 
directories.
+    std::vector<std::string> missing_parents;
+    auto dir = name;
+    for (; !dir.empty(); dir = get_parent(dir)) {
+      auto o = client_.GetObjectMetadata(bucket, dir);
+      if (o) {
+        if (IsDirectory(*o)) break;
+        return NotDirectoryError(*o);
       }
+      missing_parents.push_back(dir);
     }
-
-    for (auto parent = get_parent(object); !parent.empty(); parent = 
get_parent(parent)) {
-      status = CreateDirMarker(bucket, parent);
-      if (status.code() == GcsCode::kAlreadyExists) {
-        break;
+    if (dir.empty()) {
+      // We could not find any of the parent directories in the bucket, the 
last step is

Review comment:
       is this consistent with what the s3 connector does.  bucket creation 
seems pretty heavy weight to maybe do it accidentally

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -310,93 +318,107 @@ class GcsFileSystem::Impl {
   Result<FileInfo> GetFileInfo(const GcsPath& path) {
     if (path.object.empty()) {
       auto meta = client_.GetBucketMetadata(path.bucket);
-      return GetFileInfoDirectory(path, std::move(meta).status());
+      return GetFileInfoBucket(path, std::move(meta).status());
     }
     auto meta = client_.GetObjectMetadata(path.bucket, path.object);
-    if (path.object.back() == '/') {
-      return GetFileInfoDirectory(path, std::move(meta).status());
-    }
-    return GetFileInfoFile(path, meta);
+    return GetFileInfoObject(path, meta);
   }
 
   Result<FileInfoVector> GetFileInfo(const FileSelector& select) {
     ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir));
+    // Adding the trailing '/' avoids problems with files named 'a', 'ab', 
'ac' which
+    // where GCS would return all of them if the prefix is 'a'.
     const auto canonical = internal::EnsureTrailingSlash(p.object);
     const auto max_depth = internal::Depth(canonical) + select.max_recursion;
     auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
     auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/");
-    bool found_directory = false;
     FileInfoVector result;
     for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) {
       if (!o.ok()) {
         if (select.allow_not_found &&
             o.status().code() == google::cloud::StatusCode::kNotFound) {
-          continue;
+          return result;
         }
         return internal::ToArrowStatus(o.status());
       }
-      found_directory = true;
       // Skip the directory itself from the results, and any result that is 
"too deep"
       // into the recursion.
       if (o->name() == p.object || internal::Depth(o->name()) > max_depth) {
         continue;
       }
       auto path = internal::ConcatAbstractPath(o->bucket(), o->name());
-      if (o->name().back() == '/') {
-        result.push_back(
-            FileInfo(internal::EnsureTrailingSlash(path), 
FileType::Directory));
-        continue;
-      }
       result.push_back(ToFileInfo(path, *o));
     }
-    if (!found_directory && !select.allow_not_found) {
-      return Status::IOError("No such file or directory '", select.base_dir, 
"'");
+    // Finding any elements indicates the directory was found.
+    if (!result.empty() || select.allow_not_found) {
+      return result;
     }
-    return result;
+    // To find out if the directory exists we need to perform an additional 
query.
+    ARROW_ASSIGN_OR_RAISE(auto directory, GetFileInfo(p));
+    if (directory.IsDirectory()) return result;
+    return Status::IOError("No such file or directory '", select.base_dir, 
"'");
   }
 
   // GCS does not have directories or folders. But folders can be emulated 
(with some
   // limitations) using marker objects.  That and listing with prefixes 
creates the
   // illusion of folders.
-  google::cloud::Status CreateDirMarker(const std::string& bucket,
-                                        util::string_view name) {
+  google::cloud::StatusOr<gcs::ObjectMetadata> CreateDirMarker(const 
std::string& bucket,
+                                                               
util::string_view name) {
     // Make the name canonical.
-    const auto canonical = internal::EnsureTrailingSlash(name);
-    return client_
-        .InsertObject(bucket, canonical, std::string(),
-                      
gcs::WithObjectMetadata(gcs::ObjectMetadata().upsert_metadata(
-                          "arrow/gcsfs", "directory")))
-        .status();
+    const auto canonical = internal::RemoveTrailingSlash(name).to_string();
+    auto object = client_.InsertObject(

Review comment:
       nit, I think it would be clearer to spell out type.

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -310,93 +318,107 @@ class GcsFileSystem::Impl {
   Result<FileInfo> GetFileInfo(const GcsPath& path) {
     if (path.object.empty()) {
       auto meta = client_.GetBucketMetadata(path.bucket);
-      return GetFileInfoDirectory(path, std::move(meta).status());
+      return GetFileInfoBucket(path, std::move(meta).status());
     }
     auto meta = client_.GetObjectMetadata(path.bucket, path.object);
-    if (path.object.back() == '/') {
-      return GetFileInfoDirectory(path, std::move(meta).status());
-    }
-    return GetFileInfoFile(path, meta);
+    return GetFileInfoObject(path, meta);
   }
 
   Result<FileInfoVector> GetFileInfo(const FileSelector& select) {
     ARROW_ASSIGN_OR_RAISE(auto p, GcsPath::FromString(select.base_dir));
+    // Adding the trailing '/' avoids problems with files named 'a', 'ab', 
'ac' which
+    // where GCS would return all of them if the prefix is 'a'.
     const auto canonical = internal::EnsureTrailingSlash(p.object);
     const auto max_depth = internal::Depth(canonical) + select.max_recursion;
     auto prefix = p.object.empty() ? gcs::Prefix() : gcs::Prefix(canonical);
     auto delimiter = select.recursive ? gcs::Delimiter() : gcs::Delimiter("/");
-    bool found_directory = false;
     FileInfoVector result;
     for (auto const& o : client_.ListObjects(p.bucket, prefix, delimiter)) {
       if (!o.ok()) {
         if (select.allow_not_found &&
             o.status().code() == google::cloud::StatusCode::kNotFound) {
-          continue;
+          return result;
         }
         return internal::ToArrowStatus(o.status());
       }
-      found_directory = true;
       // Skip the directory itself from the results, and any result that is 
"too deep"
       // into the recursion.
       if (o->name() == p.object || internal::Depth(o->name()) > max_depth) {
         continue;
       }
       auto path = internal::ConcatAbstractPath(o->bucket(), o->name());
-      if (o->name().back() == '/') {
-        result.push_back(
-            FileInfo(internal::EnsureTrailingSlash(path), 
FileType::Directory));
-        continue;
-      }
       result.push_back(ToFileInfo(path, *o));
     }
-    if (!found_directory && !select.allow_not_found) {
-      return Status::IOError("No such file or directory '", select.base_dir, 
"'");
+    // Finding any elements indicates the directory was found.
+    if (!result.empty() || select.allow_not_found) {
+      return result;
     }
-    return result;
+    // To find out if the directory exists we need to perform an additional 
query.
+    ARROW_ASSIGN_OR_RAISE(auto directory, GetFileInfo(p));
+    if (directory.IsDirectory()) return result;
+    return Status::IOError("No such file or directory '", select.base_dir, 
"'");
   }
 
   // GCS does not have directories or folders. But folders can be emulated 
(with some
   // limitations) using marker objects.  That and listing with prefixes 
creates the
   // illusion of folders.
-  google::cloud::Status CreateDirMarker(const std::string& bucket,
-                                        util::string_view name) {
+  google::cloud::StatusOr<gcs::ObjectMetadata> CreateDirMarker(const 
std::string& bucket,
+                                                               
util::string_view name) {
     // Make the name canonical.
-    const auto canonical = internal::EnsureTrailingSlash(name);
-    return client_
-        .InsertObject(bucket, canonical, std::string(),
-                      
gcs::WithObjectMetadata(gcs::ObjectMetadata().upsert_metadata(
-                          "arrow/gcsfs", "directory")))
-        .status();
+    const auto canonical = internal::RemoveTrailingSlash(name).to_string();
+    auto object = client_.InsertObject(
+        bucket, canonical, std::string(),
+        gcs::WithObjectMetadata(
+            gcs::ObjectMetadata().upsert_metadata("arrow/gcsfs", "directory")),
+        gcs::IfGenerationMatch(0));
+    if (object) return object;
+    if (object.status().code() == GcsCode::kFailedPrecondition) {
+      // The marker already exists, find out if it is a directory or a file.
+      return client_.GetObjectMetadata(bucket, canonical);
+    }
+    return object;
   }
 
-  google::cloud::Status CreateDirMarkerRecursive(const std::string& bucket,
-                                                 const std::string& object) {
-    using GcsCode = google::cloud::StatusCode;
+  static Status NotDirectoryError(const gcs::ObjectMetadata& o) {
+    return Status::IOError(
+        "Cannot create directory, it conflicts with an existing file '",
+        internal::ConcatAbstractPath(o.bucket(), o.name()), "'");
+  }
+
+  Status CreateDirMarkerRecursive(const std::string& bucket, const 
std::string& name) {
     auto get_parent = [](std::string const& path) {
       return std::move(internal::GetAbstractPathParent(path).first);
     };
-    // Maybe counterintuitively we create the markers from the most nested and 
up. Because
-    // GCS does not have directories creating `a/b/c` will succeed, even if 
`a/` or `a/b/`
-    // does not exist.  In the common case, where `a/b/` may already exist, it 
is more
-    // efficient to just create `a/b/c/` and then find out that `a/b/` was 
already there.
-    // In the case where none exists, it does not matter which order we follow.
-    auto status = CreateDirMarker(bucket, object);
-    if (status.code() == GcsCode::kAlreadyExists) return {};
-    if (status.code() == GcsCode::kNotFound) {
-      // Missing bucket, create it first ...
-      status = client_.CreateBucket(bucket, gcs::BucketMetadata()).status();
-      if (status.code() != GcsCode::kOk && status.code() != 
GcsCode::kAlreadyExists) {
-        return status;
+    // Find the list of missing parents. In the process we discover if any 
elements in
+    // the path are files, this is unavoidable as GCS does not really have 
directories.
+    std::vector<std::string> missing_parents;
+    auto dir = name;
+    for (; !dir.empty(); dir = get_parent(dir)) {
+      auto o = client_.GetObjectMetadata(bucket, dir);
+      if (o) {
+        if (IsDirectory(*o)) break;
+        return NotDirectoryError(*o);
       }
+      missing_parents.push_back(dir);
     }
-
-    for (auto parent = get_parent(object); !parent.empty(); parent = 
get_parent(parent)) {
-      status = CreateDirMarker(bucket, parent);
-      if (status.code() == GcsCode::kAlreadyExists) {
-        break;
+    if (dir.empty()) {
+      // We could not find any of the parent directories in the bucket, the 
last step is
+      // to find out if the bucket exists, and if necessary, create it
+      auto b = client_.GetBucketMetadata(bucket);

Review comment:
       spelling out type here would be more readable I think.

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -445,29 +478,46 @@ class GcsFileSystem::Impl {
   }
 
   Status DeleteFile(const GcsPath& p) {
-    if (!p.object.empty() && p.object.back() == '/') {
-      return Status::IOError("The given path (" + p.full_path +
-                             ") is a directory, use DeleteDir");
+    if (!p.object.empty()) {
+      auto stat = client_.GetObjectMetadata(p.bucket, p.object);
+      if (!stat) return internal::ToArrowStatus(stat.status());
+      if (IsDirectory(*stat)) {
+        return Status::IOError("The given path '", p.full_path,
+                               "' is a directory, use DeleteDir");
+      }
     }
     return internal::ToArrowStatus(client_.DeleteObject(p.bucket, p.object));
   }
 
   Status Move(const GcsPath& src, const GcsPath& dest) {
-    if (src.full_path.empty() || src.object.empty() ||
-        src.object.back() == internal::kSep) {
+    if (src == dest) return {};

Review comment:
       nit: return Status::OK() is more idiomatic for the code base

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -659,8 +715,8 @@ Result<std::shared_ptr<io::InputStream>> 
GcsFileSystem::OpenInputStream(
 
 Result<std::shared_ptr<io::InputStream>> GcsFileSystem::OpenInputStream(
     const FileInfo& info) {
-  if (!info.IsFile()) {
-    return Status::IOError("Only files can be opened as input streams");
+  if (info.IsDirectory()) {
+    return Status::IOError("Cannot open a directory as an input stream");

Review comment:
       maybe include the path requested here?

##########
File path: cpp/src/arrow/filesystem/gcsfs.cc
##########
@@ -688,6 +744,9 @@ Result<std::shared_ptr<io::RandomAccessFile>> 
GcsFileSystem::OpenInputFile(
 
 Result<std::shared_ptr<io::RandomAccessFile>> GcsFileSystem::OpenInputFile(
     const FileInfo& info) {
+  if (info.IsDirectory()) {
+    return Status::IOError("Cannot open a directory as an input stream");

Review comment:
       same comment above about providing the path?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to