This is an automated email from the ASF dual-hosted git repository. kszucs pushed a commit to branch maint-6.0.x in repository https://gitbox.apache.org/repos/asf/arrow.git
commit b0acbc6312d21a645a541bfcb19e582bae356891 Author: Weston Pace <[email protected]> AuthorDate: Wed Oct 27 14:18:57 2021 -0400 ARROW-14480: [R] Expose arrow::dataset::ExistingDataBehavior to R This only adds the one option so we can restore backwards compatibility for CRAN compliance. Closes #11552 from westonpace/feature/ARROW-14480--expose-new-dataset-writer-opts-to-r Lead-authored-by: Weston Pace <[email protected]> Co-authored-by: Jonathan Keane <[email protected]> Co-authored-by: Neal Richardson <[email protected]> Signed-off-by: Neal Richardson <[email protected]> --- cpp/src/arrow/dataset/dataset_writer.cc | 5 ++-- cpp/src/arrow/dataset/dataset_writer_test.cc | 6 ++--- cpp/src/arrow/dataset/file_base.h | 14 +--------- cpp/src/arrow/dataset/type_fwd.h | 12 +++++++++ r/R/arrowExports.R | 4 +-- r/R/dataset-write.R | 17 +++++++++++- r/man/arrow-package.Rd | 6 ++++- r/man/write_dataset.Rd | 10 +++++++ r/src/arrowExports.cpp | 11 ++++---- r/src/dataset.cpp | 4 ++- r/tests/testthat/test-dataset-write.R | 40 ++++++++++++++++++++++++++++ 11 files changed, 101 insertions(+), 28 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 12b7858..a61f32c 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -300,7 +300,8 @@ class DatasetWriterDirectoryQueue : public util::AsyncDestroyable { init_future_ = DeferNotOk(write_options_.filesystem->io_context().executor()->Submit([this] { RETURN_NOT_OK(write_options_.filesystem->CreateDir(directory_)); - if (write_options_.existing_data_behavior == kDeleteMatchingPartitions) { + if (write_options_.existing_data_behavior == + ExistingDataBehavior::kDeleteMatchingPartitions) { return write_options_.filesystem->DeleteDirContents(directory_); } return Status::OK(); @@ -358,7 +359,7 @@ Status ValidateBasenameTemplate(util::string_view basename_template) { } Status EnsureDestinationValid(const FileSystemDatasetWriteOptions& options) { - if (options.existing_data_behavior == kError) { + if (options.existing_data_behavior == ExistingDataBehavior::kError) { fs::FileSelector selector; selector.base_dir = options.base_dir; selector.recursive = true; diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc index e3fac05..bf38c2f 100644 --- a/cpp/src/arrow/dataset/dataset_writer_test.cc +++ b/cpp/src/arrow/dataset/dataset_writer_test.cc @@ -284,7 +284,7 @@ TEST_F(DatasetWriterTestFixture, DeleteExistingData) { fs::File("testdir/chunk-5.arrow"), fs::File("testdir/blah.txt")})); filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs); write_options_.filesystem = filesystem_; - write_options_.existing_data_behavior = kDeleteMatchingPartitions; + write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions; EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); AssertFinished(queue_fut); @@ -302,7 +302,7 @@ TEST_F(DatasetWriterTestFixture, PartitionedDeleteExistingData) { fs::File("testdir/part1/bar.arrow")})); filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs); write_options_.filesystem = filesystem_; - write_options_.existing_data_behavior = kDeleteMatchingPartitions; + write_options_.existing_data_behavior = ExistingDataBehavior::kDeleteMatchingPartitions; EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "part0"); AssertFinished(queue_fut); @@ -321,7 +321,7 @@ TEST_F(DatasetWriterTestFixture, LeaveExistingData) { fs::File("testdir/chunk-5.arrow"), fs::File("testdir/blah.txt")})); filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs); write_options_.filesystem = filesystem_; - write_options_.existing_data_behavior = kOverwriteOrIgnore; + write_options_.existing_data_behavior = ExistingDataBehavior::kOverwriteOrIgnore; EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); AssertFinished(queue_fut); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 3c7b825..9113691 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -343,18 +343,6 @@ class ARROW_DS_EXPORT FileWriter { fs::FileLocator destination_locator_; }; -/// \brief Controls what happens if files exist in an output directory during a dataset -/// write -enum ExistingDataBehavior : int8_t { - /// Deletes all files in a directory the first time that directory is encountered - kDeleteMatchingPartitions, - /// Ignores existing files, overwriting any that happen to have the same name as an - /// output file - kOverwriteOrIgnore, - /// Returns an error if there are any files or subdirectories in the output directory - kError, -}; - /// \brief Options for writing a dataset. struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { /// Options for individual fragment writing. @@ -388,7 +376,7 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { uint64_t max_rows_per_file = 0; /// Controls what happens if an output directory already exists. - ExistingDataBehavior existing_data_behavior = kError; + ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError; /// Callback to be invoked against all FileWriters before /// they are finalized with FileWriter::Finish(). diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index ad1a299..78748a3 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -52,6 +52,18 @@ class FileSystemDataset; class FileSystemDatasetFactory; struct FileSystemDatasetWriteOptions; +/// \brief Controls what happens if files exist in an output directory during a dataset +/// write +enum class ExistingDataBehavior : int8_t { + /// Deletes all files in a directory the first time that directory is encountered + kDeleteMatchingPartitions, + /// Ignores existing files, overwriting any that happen to have the same name as an + /// output file + kOverwriteOrIgnore, + /// Returns an error if there are any files or subdirectories in the output directory + kError, +}; + class InMemoryDataset; class CsvFileFormat; diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index f5f2dd7..014b164 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -604,8 +604,8 @@ dataset___ScanTask__get_batches <- function(scan_task) { .Call(`_arrow_dataset___ScanTask__get_batches`, scan_task) } -dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner) { - invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner)) +dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior) { + invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior)) } dataset___Scanner__TakeRows <- function(scanner, indices) { diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 95c7f7b..3a98357 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -38,6 +38,16 @@ #' will yield `"part-0.feather", ...`. #' @param hive_style logical: write partition segments as Hive-style #' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is `TRUE`. +#' @param existing_data_behavior The behavior to use when there is already data +#' in the destination directory. Must be one of "overwrite", "error", or +#' "delete_matching". +#' - "overwrite" (the default) then any new files created will overwrite +#' existing files +#' - "error" then the operation will fail if the destination directory is not +#' empty +#' - "delete_matching" then the writer will delete any existing partitions +#' if data is going to be written to those partitions and will leave alone +#' partitions which data is not written to. #' @param ... additional format-specific arguments. For available Parquet #' options, see [write_parquet()]. The available Feather options are #' - `use_legacy_format` logical: write data formatted so that Arrow libraries @@ -97,6 +107,7 @@ write_dataset <- function(dataset, partitioning = dplyr::group_vars(dataset), basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, + existing_data_behavior = c("overwrite", "error", "delete_matching"), ...) { format <- match.arg(format) if (inherits(dataset, "arrow_dplyr_query")) { @@ -122,8 +133,12 @@ write_dataset <- function(dataset, path_and_fs <- get_path_and_filesystem(path) options <- FileWriteOptions$create(format, table = scanner, ...) + existing_data_behavior_opts <- c("delete_matching", "overwrite", "error") + existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L + dataset___Dataset__Write( options, path_and_fs$fs, path_and_fs$path, - partitioning, basename_template, scanner + partitioning, basename_template, scanner, + existing_data_behavior ) } diff --git a/r/man/arrow-package.Rd b/r/man/arrow-package.Rd index 122f768..0217621 100644 --- a/r/man/arrow-package.Rd +++ b/r/man/arrow-package.Rd @@ -6,7 +6,11 @@ \alias{arrow-package} \title{arrow: Integration to 'Apache' 'Arrow'} \description{ -'Apache' 'Arrow' <https://arrow.apache.org/> is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. This package provides an interface to the 'Arrow C++' library. +'Apache' 'Arrow' <https://arrow.apache.org/> is a cross-language + development platform for in-memory data. It specifies a standardized + language-independent columnar memory format for flat and hierarchical data, + organized for efficient analytic operations on modern hardware. This + package provides an interface to the 'Arrow C++' library. } \seealso{ Useful links: diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd index 219cc83..76bbaf7 100644 --- a/r/man/write_dataset.Rd +++ b/r/man/write_dataset.Rd @@ -11,6 +11,7 @@ write_dataset( partitioning = dplyr::group_vars(dataset), basename_template = paste0("part-{i}.", as.character(format)), hive_style = TRUE, + existing_data_behavior = c("overwrite", "error", "delete_matching"), ... ) } @@ -38,6 +39,15 @@ will yield \verb{"part-0.feather", ...}.} \item{hive_style}{logical: write partition segments as Hive-style (\code{key1=value1/key2=value2/file.ext}) or as just bare values. Default is \code{TRUE}.} +\item{existing_data_behavior}{The behavior to use when there is already data +in the destination directory. Must be one of overwrite, error, or +delete_matching. When this is set to "overwrite" (the default) then any +new files created will overwrite existing files. When this is set to +"error" then the operation will fail if the destination directory is not +empty. When this is set to "delete_matching" then the writer will delete +any existing partitions if data is going to be written to those partitions +and will leave alone partitions which data is not written to.} + \item{...}{additional format-specific arguments. For available Parquet options, see \code{\link[=write_parquet]{write_parquet()}}. The available Feather options are \itemize{ diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index c446b77..5872aa4 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2396,8 +2396,8 @@ extern "C" SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner); -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){ +void dataset___Dataset__Write(const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, const std::shared_ptr<ds::Scanner>& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior); +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp){ BEGIN_CPP11 arrow::r::Input<const std::shared_ptr<ds::FileWriteOptions>&>::type file_write_options(file_write_options_sexp); arrow::r::Input<const std::shared_ptr<fs::FileSystem>&>::type filesystem(filesystem_sexp); @@ -2405,12 +2405,13 @@ BEGIN_CPP11 arrow::r::Input<const std::shared_ptr<ds::Partitioning>&>::type partitioning(partitioning_sexp); arrow::r::Input<std::string>::type basename_template(basename_template_sexp); arrow::r::Input<const std::shared_ptr<ds::Scanner>&>::type scanner(scanner_sexp); - dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner); + arrow::r::Input<arrow::dataset::ExistingDataBehavior>::type existing_data_behavior(existing_data_behavior_sexp); + dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp){ +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp){ Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7319,7 +7320,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, { "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1}, - { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 6}, + { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 7}, { "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, { "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 544c9d8..7e384aa 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -516,9 +516,11 @@ void dataset___Dataset__Write( const std::shared_ptr<ds::FileWriteOptions>& file_write_options, const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir, const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template, - const std::shared_ptr<ds::Scanner>& scanner) { + const std::shared_ptr<ds::Scanner>& scanner, + arrow::dataset::ExistingDataBehavior existing_data_behavior) { ds::FileSystemDatasetWriteOptions opts; opts.file_write_options = file_write_options; + opts.existing_data_behavior = existing_data_behavior; opts.filesystem = filesystem; opts.base_dir = base_dir; opts.partitioning = partitioning; diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index 705103f..8e7c077 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -139,6 +139,46 @@ test_that("Writing a dataset: Parquet->Parquet (default)", { ) }) +test_that("Writing a dataset: existing data behavior", { + # This test does not work on Windows because unlink does not immediately + # delete the data. + skip_on_os("windows") + ds <- open_dataset(csv_dir, partitioning = "part", format = "csv") + dst_dir <- make_temp_dir() + write_dataset(ds, dst_dir, format = "feather", partitioning = "int") + expect_true(dir.exists(dst_dir)) + + check_dataset <- function() { + new_ds <- open_dataset(dst_dir, format = "feather") + + expect_equal( + new_ds %>% + select(string = chr, integer = int) %>% + filter(integer > 6 & integer < 11) %>% + collect() %>% + summarize(mean = mean(integer)), + df1 %>% + select(string = chr, integer = int) %>% + filter(integer > 6) %>% + summarize(mean = mean(integer)) + ) + } + + check_dataset() + # By default we should overwrite + write_dataset(ds, dst_dir, format = "feather", partitioning = "int") + check_dataset() + write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "overwrite") + check_dataset() + expect_error( + write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error"), + "directory is not empty" + ) + unlink(dst_dir, recursive = TRUE) + write_dataset(ds, dst_dir, format = "feather", partitioning = "int", existing_data_behavior = "error") + check_dataset() +}) + test_that("Writing a dataset: no format specified", { dst_dir <- make_temp_dir() write_dataset(example_data, dst_dir)
