This is an automated email from the ASF dual-hosted git repository.
thisisnic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5ecdd94515 GH-34436: [R] Bindings for JSON Dataset (#35055)
5ecdd94515 is described below
commit 5ecdd94515cb5fb7a6f479a05b84a65075707a39
Author: Nic Crane <[email protected]>
AuthorDate: Sun Jun 18 20:20:00 2023 +0100
GH-34436: [R] Bindings for JSON Dataset (#35055)
* Closes: #34436
Authored-by: Nic Crane <[email protected]>
Signed-off-by: Nic Crane <[email protected]>
---
cpp/src/arrow/dataset/type_fwd.h | 5 +++
r/DESCRIPTION | 1 +
r/NAMESPACE | 2 +
r/R/arrowExports.R | 8 ++++
r/R/dataset-factory.R | 2 +-
r/R/dataset-format.R | 74 +++++++++++++++++++++++++++++++++++-
r/R/dataset.R | 2 +-
r/_pkgdown.yml | 1 +
r/man/FragmentScanOptions.Rd | 1 +
r/man/JsonFileFormat.Rd | 25 ++++++++++++
r/man/dataset_factory.Rd | 2 +-
r/man/open_dataset.Rd | 2 +-
r/src/arrowExports.cpp | 34 +++++++++++++++++
r/src/dataset.cpp | 24 ++++++++++++
r/tests/testthat/test-dataset-json.R | 70 ++++++++++++++++++++++++++++++++++
15 files changed, 248 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h
index a7ea8d6ce9..d58781e038 100644
--- a/cpp/src/arrow/dataset/type_fwd.h
+++ b/cpp/src/arrow/dataset/type_fwd.h
@@ -72,6 +72,11 @@ class CsvFileWriter;
class CsvFileWriteOptions;
struct CsvFragmentScanOptions;
+class JsonFileFormat;
+class JsonFileWriter;
+class JsonFileWriteOptions;
+struct JsonFragmentScanOptions;
+
class IpcFileFormat;
class IpcFileWriter;
class IpcFileWriteOptions;
diff --git a/r/DESCRIPTION b/r/DESCRIPTION
index 0af4ad94ee..5f5e51b261 100644
--- a/r/DESCRIPTION
+++ b/r/DESCRIPTION
@@ -53,6 +53,7 @@ Suggests:
dplyr,
duckdb (>= 0.2.8),
hms,
+ jsonlite,
knitr,
lubridate,
pillar,
diff --git a/r/NAMESPACE b/r/NAMESPACE
index 06edcb31a8..2bf1a863a2 100644
--- a/r/NAMESPACE
+++ b/r/NAMESPACE
@@ -221,6 +221,8 @@ export(HivePartitioningFactory)
export(InMemoryDataset)
export(IpcFileFormat)
export(JoinType)
+export(JsonFileFormat)
+export(JsonFragmentScanOptions)
export(JsonParseOptions)
export(JsonReadOptions)
export(JsonTableReader)
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 6af43a6958..2d935ff871 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -732,6 +732,10 @@ dataset___CsvFileFormat__Make <- function(parse_options,
convert_options, read_o
.Call(`_arrow_dataset___CsvFileFormat__Make`, parse_options,
convert_options, read_options)
}
+dataset___JsonFileFormat__Make <- function(parse_options, read_options) {
+ .Call(`_arrow_dataset___JsonFileFormat__Make`, parse_options, read_options)
+}
+
dataset___FragmentScanOptions__type_name <- function(fragment_scan_options) {
.Call(`_arrow_dataset___FragmentScanOptions__type_name`,
fragment_scan_options)
}
@@ -740,6 +744,10 @@ dataset___CsvFragmentScanOptions__Make <-
function(convert_options, read_options
.Call(`_arrow_dataset___CsvFragmentScanOptions__Make`, convert_options,
read_options)
}
+dataset___JsonFragmentScanOptions__Make <- function(parse_options,
read_options) {
+ .Call(`_arrow_dataset___JsonFragmentScanOptions__Make`, parse_options,
read_options)
+}
+
dataset___ParquetFragmentScanOptions__Make <- function(use_buffered_stream,
buffer_size, pre_buffer) {
.Call(`_arrow_dataset___ParquetFragmentScanOptions__Make`,
use_buffered_stream, buffer_size, pre_buffer)
}
diff --git a/r/R/dataset-factory.R b/r/R/dataset-factory.R
index d59e0b99c6..adb7353a04 100644
--- a/r/R/dataset-factory.R
+++ b/r/R/dataset-factory.R
@@ -39,7 +39,7 @@ DatasetFactory <- R6Class("DatasetFactory",
)
DatasetFactory$create <- function(x,
filesystem = NULL,
- format = c("parquet", "arrow", "ipc",
"feather", "csv", "tsv", "text"),
+ format = c("parquet", "arrow", "ipc",
"feather", "csv", "tsv", "text", "json"),
partitioning = NULL,
hive_style = NA,
factory_options = list(),
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index 98507e427d..8798e0248e 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -78,12 +78,14 @@ FileFormat$create <- function(format, schema = NULL, ...) {
opt_names <- names(list(...))
if (format %in% c("csv", "text") || any(opt_names %in% c("delim",
"delimiter"))) {
CsvFileFormat$create(schema = schema, ...)
- } else if (format == c("tsv")) {
+ } else if (format == "tsv") {
CsvFileFormat$create(delimiter = "\t", schema = schema, ...)
} else if (format == "parquet") {
ParquetFileFormat$create(...)
} else if (format %in% c("ipc", "arrow", "feather")) { # These are aliases
for the same thing
dataset___IpcFileFormat__Make()
+ } else if (format == "json") {
+ JsonFileFormat$create(...)
} else {
stop("Unsupported file format: ", format, call. = FALSE)
}
@@ -113,6 +115,45 @@ ParquetFileFormat$create <- function(...,
#' @export
IpcFileFormat <- R6Class("IpcFileFormat", inherit = FileFormat)
+#' JSON dataset file format
+#'
+#' @description
+#' A `JsonFileFormat` is a [FileFormat] subclass which holds information about
how to
+#' read and parse the files included in a JSON `Dataset`.
+#'
+#' @section Factory:
+#' `JsonFileFormat$create()` can take options in the form of lists passed
through as `parse_options`,
+#' or `read_options` parameters.
+#'
+#' Available `read_options` parameters:
+#' * `use_threads`: Whether to use the global CPU thread pool. Default
`TRUE`. If `FALSE`, JSON input must end with an
+#' empty line.
+#' * `block_size`: Block size we request from the IO layer; also determines
size of chunks when `use_threads`
+#' is `TRUE`.
+#'
+#' Available `parse_options` parameters:
+#' * `newlines_in_values`:Logical: are values allowed to contain CR (`0x0d`
or `\r`) and LF (`0x0a` or `\n`)
+#' characters? (default `FALSE`)
+#'
+#' @return A `JsonFileFormat` object
+#' @rdname JsonFileFormat
+#' @name JsonFileFormat
+#' @seealso [FileFormat]
+#' @examplesIf arrow_with_dataset()
+#'
+#' @export
+JsonFileFormat <- R6Class("JsonFileFormat", inherit = FileFormat)
+JsonFileFormat$create <- function(...) {
+ dots <- list2(...)
+ parse_opt_choices <- dots[names(dots) %in%
names(formals(JsonParseOptions$create))]
+ read_opt_choices <- dots[names(dots) %in%
names(formals(JsonReadOptions$create))]
+
+ parse_options <- do.call(JsonParseOptions$create, parse_opt_choices)
+ read_options <- do.call(JsonReadOptions$create, read_opt_choices)
+ dataset___JsonFileFormat__Make(parse_options, read_options)
+}
+
+
#' CSV dataset file format
#'
#' @description
@@ -500,6 +541,8 @@ FragmentScanOptions$create <- function(format, ...) {
CsvFragmentScanOptions$create(...)
} else if (format == "parquet") {
ParquetFragmentScanOptions$create(...)
+ } else if (format == "json") {
+ JsonFragmentScanOptions$create(...)
} else {
stop("Unsupported file format: ", format, call. = FALSE)
}
@@ -532,6 +575,35 @@ ParquetFragmentScanOptions$create <-
function(use_buffered_stream = FALSE,
dataset___ParquetFragmentScanOptions__Make(use_buffered_stream, buffer_size,
pre_buffer)
}
+#' @usage NULL
+#' @format NULL
+#' @rdname FragmentScanOptions
+#' @export
+JsonFragmentScanOptions <- R6Class("JsonFragmentScanOptions", inherit =
FragmentScanOptions)
+JsonFragmentScanOptions$create <- function(...) {
+ dots <- list2(...)
+ valid_parse_options <- names(formals(JsonParseOptions$create))
+ valid_read_options <- names(formals(JsonReadOptions$create))
+ valid_options <- c(valid_parse_options, valid_read_options)
+
+ parse_opt_choices <- dots[names(dots) %in% valid_parse_options]
+ read_opt_choices <- dots[names(dots) %in% valid_read_options]
+
+ if (length(setdiff(names(dots), valid_options)) > 0) {
+ abort(
+ c(
+ paste("`JsonFragmentScanOptions` must match one or more of:",
oxford_paste(valid_options, quote_symbol = "`")),
+ i = paste("Invalid selection(s):", oxford_paste(setdiff(names(dots),
valid_options), quote_symbol = "`"))
+ )
+ )
+ }
+
+ parse_options <- do.call(JsonParseOptions$create, parse_opt_choices)
+ read_options <- do.call(JsonReadOptions$create, read_opt_choices)
+
+ dataset___JsonFragmentScanOptions__Make(parse_options, read_options)
+}
+
#' Format-specific write options
#'
#' @description
diff --git a/r/R/dataset.R b/r/R/dataset.R
index 30d3ed5ae1..b7728ff897 100644
--- a/r/R/dataset.R
+++ b/r/R/dataset.R
@@ -178,7 +178,7 @@ open_dataset <- function(sources,
partitioning = hive_partition(),
hive_style = NA,
unify_schemas = NULL,
- format = c("parquet", "arrow", "ipc", "feather",
"csv", "tsv", "text"),
+ format = c("parquet", "arrow", "ipc", "feather",
"csv", "tsv", "text", "json"),
factory_options = list(),
...) {
stop_if_no_datasets()
diff --git a/r/_pkgdown.yml b/r/_pkgdown.yml
index f3711fdc3d..26a77199b5 100644
--- a/r/_pkgdown.yml
+++ b/r/_pkgdown.yml
@@ -153,6 +153,7 @@ reference:
- Scanner
- FileFormat
- CsvFileFormat
+ - JsonFileFormat
- FileWriteOptions
- FragmentScanOptions
- map_batches
diff --git a/r/man/FragmentScanOptions.Rd b/r/man/FragmentScanOptions.Rd
index ac120a6eff..79bb3ea3c3 100644
--- a/r/man/FragmentScanOptions.Rd
+++ b/r/man/FragmentScanOptions.Rd
@@ -4,6 +4,7 @@
\alias{FragmentScanOptions}
\alias{CsvFragmentScanOptions}
\alias{ParquetFragmentScanOptions}
+\alias{JsonFragmentScanOptions}
\title{Format-specific scan options}
\description{
A \code{FragmentScanOptions} holds options specific to a \code{FileFormat} and
a scan
diff --git a/r/man/JsonFileFormat.Rd b/r/man/JsonFileFormat.Rd
new file mode 100644
index 0000000000..296f8a92eb
--- /dev/null
+++ b/r/man/JsonFileFormat.Rd
@@ -0,0 +1,25 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/dataset-format.R
+\name{JsonFileFormat}
+\alias{JsonFileFormat}
+\title{JSON dataset file format}
+\value{
+A \code{JsonFileFormat} object
+}
+\description{
+A \code{JsonFileFormat} is a \link{FileFormat} subclass which holds
information about how to
+read and parse the files included in a JSON \code{Dataset}.
+}
+\section{Factory}{
+
+\code{JsonFileFormat$create()} can take options in the form of lists passed
through as \code{parse_options},
+or \code{read_options} parameters.
+}
+
+\examples{
+\dontshow{if (arrow_with_dataset()) (if (getRversion() >= "3.4") withAutoprint
else force)(\{ # examplesIf}
+\dontshow{\}) # examplesIf}
+}
+\seealso{
+\link{FileFormat}
+}
diff --git a/r/man/dataset_factory.Rd b/r/man/dataset_factory.Rd
index 86fe641cb1..7c529d66f9 100644
--- a/r/man/dataset_factory.Rd
+++ b/r/man/dataset_factory.Rd
@@ -7,7 +7,7 @@
dataset_factory(
x,
filesystem = NULL,
- format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"),
+ format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text",
"json"),
partitioning = NULL,
hive_style = NA,
factory_options = list(),
diff --git a/r/man/open_dataset.Rd b/r/man/open_dataset.Rd
index 07a6a1020e..94b537a1d3 100644
--- a/r/man/open_dataset.Rd
+++ b/r/man/open_dataset.Rd
@@ -10,7 +10,7 @@ open_dataset(
partitioning = hive_partition(),
hive_style = NA,
unify_schemas = NULL,
- format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text"),
+ format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text",
"json"),
factory_options = list(),
...
)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 01705bd7fa..e5fcf217d9 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1959,6 +1959,22 @@ extern "C" SEXP
_arrow_dataset___CsvFileFormat__Make(SEXP parse_options_sexp, SE
}
#endif
+// dataset.cpp
+#if defined(ARROW_R_WITH_DATASET)
+std::shared_ptr<ds::JsonFileFormat> dataset___JsonFileFormat__Make(const
std::shared_ptr<arrow::json::ParseOptions>& parse_options, const
std::shared_ptr<arrow::json::ReadOptions>& read_options);
+extern "C" SEXP _arrow_dataset___JsonFileFormat__Make(SEXP parse_options_sexp,
SEXP read_options_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const
std::shared_ptr<arrow::json::ParseOptions>&>::type
parse_options(parse_options_sexp);
+ arrow::r::Input<const std::shared_ptr<arrow::json::ReadOptions>&>::type
read_options(read_options_sexp);
+ return cpp11::as_sexp(dataset___JsonFileFormat__Make(parse_options,
read_options));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___JsonFileFormat__Make(SEXP parse_options_sexp,
SEXP read_options_sexp){
+ Rf_error("Cannot call dataset___JsonFileFormat__Make(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
+}
+#endif
+
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
std::string dataset___FragmentScanOptions__type_name(const
std::shared_ptr<ds::FragmentScanOptions>& fragment_scan_options);
@@ -1990,6 +2006,22 @@ extern "C" SEXP
_arrow_dataset___CsvFragmentScanOptions__Make(SEXP convert_optio
}
#endif
+// dataset.cpp
+#if defined(ARROW_R_WITH_DATASET)
+std::shared_ptr<ds::JsonFragmentScanOptions>
dataset___JsonFragmentScanOptions__Make(const
std::shared_ptr<arrow::json::ParseOptions>& parse_options, const
std::shared_ptr<arrow::json::ReadOptions>& read_options);
+extern "C" SEXP _arrow_dataset___JsonFragmentScanOptions__Make(SEXP
parse_options_sexp, SEXP read_options_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const
std::shared_ptr<arrow::json::ParseOptions>&>::type
parse_options(parse_options_sexp);
+ arrow::r::Input<const std::shared_ptr<arrow::json::ReadOptions>&>::type
read_options(read_options_sexp);
+ return
cpp11::as_sexp(dataset___JsonFragmentScanOptions__Make(parse_options,
read_options));
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___JsonFragmentScanOptions__Make(SEXP
parse_options_sexp, SEXP read_options_sexp){
+ Rf_error("Cannot call dataset___JsonFragmentScanOptions__Make(). See
https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow
C++ libraries. ");
+}
+#endif
+
// dataset.cpp
#if defined(ARROW_R_WITH_DATASET)
std::shared_ptr<ds::ParquetFragmentScanOptions>
dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t
buffer_size, bool pre_buffer);
@@ -5730,8 +5762,10 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_dataset___CsvFileWriteOptions__update", (DL_FUNC)
&_arrow_dataset___CsvFileWriteOptions__update, 2},
{ "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC)
&_arrow_dataset___IpcFileFormat__Make, 0},
{ "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC)
&_arrow_dataset___CsvFileFormat__Make, 3},
+ { "_arrow_dataset___JsonFileFormat__Make", (DL_FUNC)
&_arrow_dataset___JsonFileFormat__Make, 2},
{ "_arrow_dataset___FragmentScanOptions__type_name", (DL_FUNC)
&_arrow_dataset___FragmentScanOptions__type_name, 1},
{ "_arrow_dataset___CsvFragmentScanOptions__Make", (DL_FUNC)
&_arrow_dataset___CsvFragmentScanOptions__Make, 2},
+ { "_arrow_dataset___JsonFragmentScanOptions__Make", (DL_FUNC)
&_arrow_dataset___JsonFragmentScanOptions__Make, 2},
{ "_arrow_dataset___ParquetFragmentScanOptions__Make",
(DL_FUNC) &_arrow_dataset___ParquetFragmentScanOptions__Make, 3},
{ "_arrow_dataset___DirectoryPartitioning", (DL_FUNC)
&_arrow_dataset___DirectoryPartitioning, 2},
{ "_arrow_dataset___DirectoryPartitioning__MakeFactory",
(DL_FUNC) &_arrow_dataset___DirectoryPartitioning__MakeFactory, 2},
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index 713c0d3b23..3a4958e4cb 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -58,6 +58,8 @@ const char* r6_class_name<ds::FileFormat>::get(
return "IpcFileFormat";
} else if (type_name == "csv") {
return "CsvFileFormat";
+ } else if (type_name == "json") {
+ return "JsonFileFormat";
} else {
return "FileFormat";
}
@@ -297,6 +299,18 @@ std::shared_ptr<ds::CsvFileFormat>
dataset___CsvFileFormat__Make(
return format;
}
+// [[dataset::export]]
+std::shared_ptr<ds::JsonFileFormat> dataset___JsonFileFormat__Make(
+ const std::shared_ptr<arrow::json::ParseOptions>& parse_options,
+ const std::shared_ptr<arrow::json::ReadOptions>& read_options) {
+ auto format = std::make_shared<ds::JsonFileFormat>();
+ auto scan_options = std::make_shared<ds::JsonFragmentScanOptions>();
+ if (read_options) scan_options->read_options = *read_options;
+ if (parse_options) scan_options->parse_options = *parse_options;
+ format->default_fragment_scan_options = std::move(scan_options);
+ return format;
+}
+
// FragmentScanOptions, CsvFragmentScanOptions, ParquetFragmentScanOptions
// [[dataset::export]]
@@ -315,6 +329,16 @@ std::shared_ptr<ds::CsvFragmentScanOptions>
dataset___CsvFragmentScanOptions__Ma
return options;
}
+// [[dataset::export]]
+std::shared_ptr<ds::JsonFragmentScanOptions>
dataset___JsonFragmentScanOptions__Make(
+ const std::shared_ptr<arrow::json::ParseOptions>& parse_options,
+ const std::shared_ptr<arrow::json::ReadOptions>& read_options) {
+ auto options = std::make_shared<ds::JsonFragmentScanOptions>();
+ options->parse_options = *parse_options;
+ options->read_options = *read_options;
+ return options;
+}
+
// [[dataset::export]]
std::shared_ptr<ds::ParquetFragmentScanOptions>
dataset___ParquetFragmentScanOptions__Make(bool use_buffered_stream, int64_t
buffer_size,
diff --git a/r/tests/testthat/test-dataset-json.R
b/r/tests/testthat/test-dataset-json.R
new file mode 100644
index 0000000000..699beacb85
--- /dev/null
+++ b/r/tests/testthat/test-dataset-json.R
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+skip_if_not_available("dataset")
+
+library(dplyr, warn.conflicts = FALSE)
+
+test_that("JSON dataset", {
+
+ # set up JSON directory for testing
+ json_dir <- make_temp_dir()
+
+ on.exit(unlink(json_dir, recursive = TRUE))
+ dir.create(file.path(json_dir, 5))
+ dir.create(file.path(json_dir, 6))
+
+ con_file1 <- file(file.path(json_dir, 5, "file1.json"), open = "wb")
+ jsonlite::stream_out(df1, con = con_file1, verbose = FALSE)
+ close(con_file1)
+
+ con_file2 <- file(file.path(json_dir, 6, "file2.json"), open = "wb")
+ jsonlite::stream_out(df2, con = con_file2, verbose = FALSE)
+ close(con_file2)
+
+ ds <- open_dataset(json_dir, format = "json", partitioning = "part")
+
+ expect_r6_class(ds$format, "JsonFileFormat")
+ expect_r6_class(ds$filesystem, "LocalFileSystem")
+ expect_identical(names(ds), c(names(df1), "part"))
+ expect_identical(dim(ds), c(20L, 7L))
+
+ expect_equal(
+ ds %>%
+ select(string = chr, integer = int, part) %>%
+ filter(integer > 6 & part == 5) %>%
+ collect() %>%
+ summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're
being parsed as int64
+ df1 %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6) %>%
+ summarize(mean = mean(integer))
+ )
+ # Collecting virtual partition column works
+ expect_equal(
+ collect(ds) %>% arrange(part) %>% pull(part),
+ c(rep(5, 10), rep(6, 10))
+ )
+})
+
+test_that("JSON Fragment scan options", {
+ options <- FragmentScanOptions$create("json")
+ expect_equal(options$type, "json")
+
+ expect_error(FragmentScanOptions$create("json", invalid_selection = TRUE),
regexp = "invalid_selection")
+
+})