This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b808fbb84a GH-42173: [R][C++] Writing partitioned dataset on S3 fails
if ListBucket is not allowed for the user (#47599)
b808fbb84a is described below
commit b808fbb84a8ffaca6c7047ee66d8fa5c933e8104
Author: Simon Elbaz <[email protected]>
AuthorDate: Tue Oct 21 11:01:07 2025 +0200
GH-42173: [R][C++] Writing partitioned dataset on S3 fails if ListBucket is
not allowed for the user (#47599)
### Rationale for this change
This PR gives the user to choose not to create directory in the bucket
before writing dataset.
In case the `create_directory` option is set to FALSE, no verification will
be made by R arrow.
The S3 storage will itself verify if the directory exists and if the users
has the rigth to modify it.
This way no `ListBucket` or ` HeadBucket` are necessary to achieve the
write operation.
```
df |> arrow::write_dataset(
minio$path(paste0("smartsla-bucket/rarrow/")),
partitioning = "qualitative",
create_directory = FALSE,
format = "parquet"
)
```
### What changes are included in this PR?
`create_directory` is now available to the user in the `write_dataset`
function.
Before this PR, this option was automatically set to TRUE (by default).
### Are these changes tested?
Yes
### Are there any user-facing changes?
No, the default value for `create_directory` is still TRUE.
* GitHub Issue: #42173
Lead-authored-by: Simon ELBAZ <[email protected]>
Co-authored-by: Simon Elbaz <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
r/R/arrowExports.R | 4 ++--
r/R/dataset-write.R | 7 ++++++-
r/man/write_dataset.Rd | 5 +++++
r/src/arrowExports.cpp | 11 ++++++-----
r/src/compute-exec.cpp | 3 ++-
5 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index a988cfb4af..fafb5ccecd 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -468,8 +468,8 @@ ExecNode_Scan <- function(plan, dataset, filter,
projection) {
.Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, projection)
}
-ExecPlan_Write <- function(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group) {
- invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema,
file_write_options, filesystem, base_dir, partitioning, basename_template,
existing_data_behavior, max_partitions, max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group))
+ExecPlan_Write <- function(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group, create_directory) {
+ invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, schema,
file_write_options, filesystem, base_dir, partitioning, basename_template,
existing_data_behavior, max_partitions, max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group, create_directory))
}
ExecNode_Filter <- function(input, filter) {
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index 3d6daf65e0..7598b0f184 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -67,6 +67,10 @@
#' group and when this number of rows is exceeded, it is split and the next set
#' of rows is written to the next group. This value must be set such that it is
#' greater than `min_rows_per_group`. Default is 1024 * 1024.
+#' @param create_directory whether to create the directories written into.
+#' Requires appropriate permissions on the storage backend. If set to FALSE,
+#' directories are assumed to be already present if writing on a classic
+#' hierarchical filesystem. Default is TRUE
#' @param ... additional format-specific arguments. For available Parquet
#' options, see [write_parquet()]. The available Feather options are:
#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
@@ -132,6 +136,7 @@ write_dataset <- function(dataset,
max_rows_per_file = 0L,
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
+ create_directory = TRUE,
...) {
format <- match.arg(format)
if (format %in% c("feather", "ipc")) {
@@ -224,7 +229,7 @@ write_dataset <- function(dataset,
partitioning, basename_template,
existing_data_behavior, max_partitions,
max_open_files, max_rows_per_file,
- min_rows_per_group, max_rows_per_group
+ min_rows_per_group, max_rows_per_group, create_directory
)
}
diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd
index 34cffefbce..bc6edef16c 100644
--- a/r/man/write_dataset.Rd
+++ b/r/man/write_dataset.Rd
@@ -17,6 +17,7 @@ write_dataset(
max_rows_per_file = 0L,
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
+ create_directory = TRUE,
...
)
}
@@ -81,6 +82,10 @@ group and when this number of rows is exceeded, it is split
and the next set
of rows is written to the next group. This value must be set such that it is
greater than \code{min_rows_per_group}. Default is 1024 * 1024.}
+\item{create_directory}{activates directory creation before writing. Needs
ListBucket
+and HeadBucket ACL to perform checks before directory creation. If set to
FALSE, no ACL checks
+will be carried on relying on the native storage checks. Default is TRUE}
+
\item{...}{additional format-specific arguments. For available Parquet
options, see \code{\link[=write_parquet]{write_parquet()}}. The available
Feather options are:
\itemize{
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index c31cc6dc9c..4f4056b6b6 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1084,8 +1084,8 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP
dataset_sexp, SEXP fil
// compute-exec.cpp
#if defined(ARROW_R_WITH_DATASET)
-void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const
std::shared_ptr<acero::ExecNode>& final_node, const
std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<ds::FileWriteOptions>& file_write_options, const
std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const
std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
arrow::dataset::ExistingDataBehavior existing_data_behavior, int
max_partitions, uint32_t max_open_files [...]
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp){
+void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>& plan, const
std::shared_ptr<acero::ExecNode>& final_node, const
std::shared_ptr<arrow::Schema>& schema, const
std::shared_ptr<ds::FileWriteOptions>& file_write_options, const
std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const
std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
arrow::dataset::ExistingDataBehavior existing_data_behavior, int
max_partitions, uint32_t max_open_files [...]
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp){
BEGIN_CPP11
arrow::r::Input<const std::shared_ptr<acero::ExecPlan>&>::type
plan(plan_sexp);
arrow::r::Input<const std::shared_ptr<acero::ExecNode>&>::type
final_node(final_node_sexp);
@@ -1101,12 +1101,13 @@ BEGIN_CPP11
arrow::r::Input<uint64_t>::type
max_rows_per_file(max_rows_per_file_sexp);
arrow::r::Input<uint64_t>::type
min_rows_per_group(min_rows_per_group_sexp);
arrow::r::Input<uint64_t>::type
max_rows_per_group(max_rows_per_group_sexp);
- ExecPlan_Write(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group);
+ arrow::r::Input<bool>::type create_directory(create_directory_sexp);
+ ExecPlan_Write(plan, final_node, schema, file_write_options,
filesystem, base_dir, partitioning, basename_template, existing_data_behavior,
max_partitions, max_open_files, max_rows_per_file, min_rows_per_group,
max_rows_per_group, create_directory);
return R_NilValue;
END_CPP11
}
#else
-extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp){
+extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp,
SEXP schema_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP
base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP
existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP
max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp,
SEXP max_rows_per_group_sexp, SEXP create_directory_sexp){
Rf_error("Cannot call ExecPlan_Write(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
}
#endif
@@ -5814,7 +5815,7 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_ExecNode_output_schema", (DL_FUNC)
&_arrow_ExecNode_output_schema, 1},
{ "_arrow_ExecNode_has_ordered_batches", (DL_FUNC)
&_arrow_ExecNode_has_ordered_batches, 1},
{ "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4},
- { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write,
14},
+ { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write,
15},
{ "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter,
2},
{ "_arrow_ExecNode_Project", (DL_FUNC)
&_arrow_ExecNode_Project, 3},
{ "_arrow_ExecNode_Aggregate", (DL_FUNC)
&_arrow_ExecNode_Aggregate, 3},
diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp
index d0c50315c2..4191e44862 100644
--- a/r/src/compute-exec.cpp
+++ b/r/src/compute-exec.cpp
@@ -318,7 +318,7 @@ void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>&
plan,
arrow::dataset::ExistingDataBehavior
existing_data_behavior,
int max_partitions, uint32_t max_open_files,
uint64_t max_rows_per_file, uint64_t min_rows_per_group,
- uint64_t max_rows_per_group) {
+ uint64_t max_rows_per_group, bool create_directory) {
arrow::dataset::internal::Initialize();
// TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R
@@ -335,6 +335,7 @@ void ExecPlan_Write(const std::shared_ptr<acero::ExecPlan>&
plan,
opts.max_rows_per_file = max_rows_per_file;
opts.min_rows_per_group = min_rows_per_group;
opts.max_rows_per_group = max_rows_per_group;
+ opts.create_dir = create_directory;
ds::WriteNodeOptions options(std::move(opts));
options.custom_schema = std::move(schema);