This is an automated email from the ASF dual-hosted git repository.
npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3694794 ARROW-8296: [C++][Dataset] Add IpcFileWriteOptions
3694794 is described below
commit 3694794bdfd0677b95b8c95681e392512f1c9237
Author: Benjamin Kietzman <[email protected]>
AuthorDate: Fri Oct 9 16:27:37 2020 -0700
ARROW-8296: [C++][Dataset] Add IpcFileWriteOptions
Closes #8389 from bkietz/8296-IpcFileWriteOptions
Lead-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Co-authored-by: Neal Richardson <[email protected]>
Signed-off-by: Neal Richardson <[email protected]>
---
c_glib/arrow-glib/codec.cpp | 60 ++++++++++--
c_glib/arrow-glib/codec.h | 7 ++
c_glib/arrow-glib/codec.hpp | 4 +-
c_glib/arrow-glib/input-stream.cpp | 2 +-
c_glib/arrow-glib/ipc-options.cpp | 83 ++++++++--------
c_glib/arrow-glib/output-stream.cpp | 2 +-
c_glib/test/test-codec.rb | 10 ++
c_glib/test/test-write-options.rb | 22 +----
cpp/src/arrow/dataset/file_ipc.cc | 14 ++-
cpp/src/arrow/dataset/file_ipc.h | 6 +-
cpp/src/arrow/dataset/file_ipc_test.cc | 31 ++++++
cpp/src/arrow/dataset/scanner.cc | 2 +-
cpp/src/arrow/ipc/feather.cc | 5 +-
cpp/src/arrow/ipc/metadata_internal.cc | 8 +-
cpp/src/arrow/ipc/options.h | 3 +-
cpp/src/arrow/ipc/read_write_test.cc | 8 +-
cpp/src/arrow/ipc/writer.cc | 17 ++--
cpp/src/arrow/testing/gtest_util.h | 12 +++
cpp/src/arrow/util/compression.cc | 144 ++++++++++++++--------------
cpp/src/arrow/util/compression.h | 20 +++-
cpp/src/arrow/util/compression_brotli.cc | 19 ++--
cpp/src/arrow/util/compression_bz2.cc | 4 +-
cpp/src/arrow/util/compression_lz4.cc | 8 +-
cpp/src/arrow/util/compression_snappy.cc | 4 +-
cpp/src/arrow/util/compression_test.cc | 38 ++++----
cpp/src/arrow/util/compression_zlib.cc | 4 +-
cpp/src/arrow/util/compression_zstd.cc | 19 ++--
cpp/src/arrow/util/key_value_metadata.h | 5 +-
cpp/src/arrow/util/string.cc | 8 ++
cpp/src/arrow/util/string.h | 3 +
cpp/src/parquet/printer.cc | 8 +-
python/pyarrow/includes/libarrow.pxd | 4 +-
python/pyarrow/io.pxi | 19 ----
python/pyarrow/ipc.pxi | 9 +-
r/R/arrowExports.R | 8 ++
r/R/dataset-format.R | 12 +++
r/R/dataset-write.R | 11 ++-
r/R/record-batch-writer.R | 12 ++-
r/man/RecordBatchWriter.Rd | 2 +-
r/man/write_dataset.Rd | 13 ++-
r/src/arrowExports.cpp | 39 ++++++++
r/src/dataset.cpp | 19 ++++
r/tests/testthat/test-dataset.R | 32 ++++++-
r/tests/testthat/test-record-batch-reader.R | 2 +-
44 files changed, 497 insertions(+), 265 deletions(-)
diff --git a/c_glib/arrow-glib/codec.cpp b/c_glib/arrow-glib/codec.cpp
index fdd61e7..33b3d1c 100644
--- a/c_glib/arrow-glib/codec.cpp
+++ b/c_glib/arrow-glib/codec.cpp
@@ -38,7 +38,7 @@ G_BEGIN_DECLS
*/
typedef struct GArrowCodecPrivate_ {
- arrow::util::Codec *codec;
+ std::shared_ptr<arrow::util::Codec> codec;
} GArrowCodecPrivate;
enum {
@@ -57,7 +57,7 @@ garrow_codec_finalize(GObject *object)
{
auto priv = GARROW_CODEC_GET_PRIVATE(object);
- delete priv->codec;
+ priv->codec.~shared_ptr();
G_OBJECT_CLASS(garrow_codec_parent_class)->finalize(object);
}
@@ -72,7 +72,8 @@ garrow_codec_set_property(GObject *object,
switch (prop_id) {
case PROP_CODEC:
- priv->codec = static_cast<arrow::util::Codec
*>(g_value_get_pointer(value));
+ priv->codec =
+ *static_cast<std::shared_ptr<arrow::util::Codec>
*>(g_value_get_pointer(value));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
@@ -96,6 +97,8 @@ garrow_codec_get_property(GObject *object,
static void
garrow_codec_init(GArrowCodec *object)
{
+ auto priv = GARROW_CODEC_GET_PRIVATE(object);
+ new(&priv->codec) std::shared_ptr<arrow::util::Codec>;
}
static void
@@ -111,7 +114,7 @@ garrow_codec_class_init(GArrowCodecClass *klass)
spec = g_param_spec_pointer("codec",
"Codec",
- "The raw arrow::util::Codec *",
+ "The raw std::shared_ptr<arrow::util::Codec> *",
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_CODEC, spec);
@@ -133,7 +136,9 @@ garrow_codec_new(GArrowCompressionType type,
auto arrow_type = garrow_compression_type_to_raw(type);
auto arrow_codec = arrow::util::Codec::Create(arrow_type);
if (garrow::check(error, arrow_codec, "[codec][new]")) {
- return garrow_codec_new_raw(arrow_codec.ValueOrDie().release());
+ std::shared_ptr<arrow::util::Codec> arrow_codec_shared =
+ std::move(*arrow_codec);
+ return garrow_codec_new_raw(&arrow_codec_shared);
} else {
return NULL;
}
@@ -151,7 +156,46 @@ const gchar *
garrow_codec_get_name(GArrowCodec *codec)
{
auto arrow_codec = garrow_codec_get_raw(codec);
- return arrow_codec->name();
+ if (!arrow_codec) {
+ return NULL;
+ }
+ return arrow_codec->name().c_str();
+}
+
+/**
+ * garrow_codec_get_compression_type:
+ * @codec: A #GArrowCodec.
+ *
+ * Returns: The compression type of the codec.
+ *
+ * Since: 2.0.0
+ */
+GArrowCompressionType
+garrow_codec_get_compression_type(GArrowCodec *codec)
+{
+ auto arrow_codec = garrow_codec_get_raw(codec);
+ if (!arrow_codec) {
+ return GARROW_COMPRESSION_TYPE_UNCOMPRESSED;
+ }
+ return garrow_compression_type_from_raw(arrow_codec->compression_type());
+}
+
+/**
+ * garrow_codec_get_compression_level:
+ * @codec: A #GArrowCodec.
+ *
+ * Returns: The compression level of the codec.
+ *
+ * Since: 2.0.0
+ */
+gint
+garrow_codec_get_compression_level(GArrowCodec *codec)
+{
+ auto arrow_codec = garrow_codec_get_raw(codec);
+ if (!arrow_codec) {
+ return arrow::util::Codec::UseDefaultCompressionLevel();
+ }
+ return arrow_codec->compression_level();
}
G_END_DECLS
@@ -207,7 +251,7 @@ garrow_compression_type_to_raw(GArrowCompressionType type)
}
GArrowCodec *
-garrow_codec_new_raw(arrow::util::Codec *arrow_codec)
+garrow_codec_new_raw(std::shared_ptr<arrow::util::Codec> *arrow_codec)
{
auto codec = GARROW_CODEC(g_object_new(GARROW_TYPE_CODEC,
"codec", arrow_codec,
@@ -215,7 +259,7 @@ garrow_codec_new_raw(arrow::util::Codec *arrow_codec)
return codec;
}
-arrow::util::Codec *
+std::shared_ptr<arrow::util::Codec>
garrow_codec_get_raw(GArrowCodec *codec)
{
auto priv = GARROW_CODEC_GET_PRIVATE(codec);
diff --git a/c_glib/arrow-glib/codec.h b/c_glib/arrow-glib/codec.h
index 5feab2b..6e177af 100644
--- a/c_glib/arrow-glib/codec.h
+++ b/c_glib/arrow-glib/codec.h
@@ -20,6 +20,7 @@
#pragma once
#include <arrow-glib/gobject-type.h>
+#include <arrow-glib/version.h>
G_BEGIN_DECLS
@@ -63,5 +64,11 @@ GArrowCodec *garrow_codec_new(GArrowCompressionType type,
GError **error);
const gchar *garrow_codec_get_name(GArrowCodec *codec);
+GARROW_AVAILABLE_IN_2_0
+GArrowCompressionType
+garrow_codec_get_compression_type(GArrowCodec *codec);
+GARROW_AVAILABLE_IN_2_0
+gint
+garrow_codec_get_compression_level(GArrowCodec *codec);
G_END_DECLS
diff --git a/c_glib/arrow-glib/codec.hpp b/c_glib/arrow-glib/codec.hpp
index 14c3ad7..f4cfaba 100644
--- a/c_glib/arrow-glib/codec.hpp
+++ b/c_glib/arrow-glib/codec.hpp
@@ -29,6 +29,6 @@ arrow::Compression::type
garrow_compression_type_to_raw(GArrowCompressionType type);
GArrowCodec *
-garrow_codec_new_raw(arrow::util::Codec *arrow_codec);
-arrow::util::Codec *
+garrow_codec_new_raw(std::shared_ptr<arrow::util::Codec> *arrow_codec);
+std::shared_ptr<arrow::util::Codec>
garrow_codec_get_raw(GArrowCodec *codec);
diff --git a/c_glib/arrow-glib/input-stream.cpp
b/c_glib/arrow-glib/input-stream.cpp
index 3751d41..84904b7 100644
--- a/c_glib/arrow-glib/input-stream.cpp
+++ b/c_glib/arrow-glib/input-stream.cpp
@@ -1132,7 +1132,7 @@ garrow_compressed_input_stream_new(GArrowCodec *codec,
GArrowInputStream *raw,
GError **error)
{
- auto arrow_codec = garrow_codec_get_raw(codec);
+ auto arrow_codec = garrow_codec_get_raw(codec).get();
auto arrow_raw = garrow_input_stream_get_raw(raw);
auto arrow_stream =
arrow::io::CompressedInputStream::Make(arrow_codec, arrow_raw);
diff --git a/c_glib/arrow-glib/ipc-options.cpp
b/c_glib/arrow-glib/ipc-options.cpp
index 1cddd25..b9b2c41 100644
--- a/c_glib/arrow-glib/ipc-options.cpp
+++ b/c_glib/arrow-glib/ipc-options.cpp
@@ -21,6 +21,7 @@
# include <config.h>
#endif
+#include <arrow-glib/codec.hpp>
#include <arrow-glib/enums.h>
#include <arrow-glib/ipc-options.hpp>
@@ -242,6 +243,7 @@ garrow_read_options_set_included_fields(GArrowReadOptions
*options,
typedef struct GArrowWriteOptionsPrivate_ {
arrow::ipc::IpcWriteOptions options;
+ GArrowCodec *codec;
} GArrowWriteOptionsPrivate;
enum {
@@ -249,8 +251,7 @@ enum {
PROP_WRITE_OPTIONS_MAX_RECURSION_DEPTH,
PROP_WRITE_OPTIONS_ALIGNMENT,
PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT,
- PROP_WRITE_OPTIONS_COMPRESSION,
- PROP_WRITE_OPTIONS_COMPRESSION_LEVEL,
+ PROP_WRITE_OPTIONS_CODEC,
PROP_WRITE_OPTIONS_USE_THREADS,
};
@@ -264,6 +265,19 @@ G_DEFINE_TYPE_WITH_PRIVATE(GArrowWriteOptions,
GARROW_WRITE_OPTIONS(obj)))
static void
+garrow_write_options_dispose(GObject *object)
+{
+ auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object);
+
+ if (priv->codec) {
+ g_object_unref(priv->codec);
+ priv->codec = NULL;
+ }
+
+ G_OBJECT_CLASS(garrow_write_options_parent_class)->dispose(object);
+}
+
+static void
garrow_write_options_finalize(GObject *object)
{
auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object);
@@ -294,12 +308,12 @@ garrow_write_options_set_property(GObject *object,
case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT:
priv->options.write_legacy_ipc_format = g_value_get_boolean(value);
break;
- case PROP_WRITE_OPTIONS_COMPRESSION:
- priv->options.compression =
- static_cast<arrow::Compression::type>(g_value_get_enum(value));
- break;
- case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL:
- priv->options.compression_level = g_value_get_int(value);
+ case PROP_WRITE_OPTIONS_CODEC:
+ if (priv->codec) {
+ g_object_unref(priv->codec);
+ }
+ priv->codec = GARROW_CODEC(g_value_dup_object(value));
+ priv->options.codec = garrow_codec_get_raw(priv->codec);
break;
case PROP_WRITE_OPTIONS_USE_THREADS:
priv->options.use_threads = g_value_get_boolean(value);
@@ -331,11 +345,8 @@ garrow_write_options_get_property(GObject *object,
case PROP_WRITE_OPTIONS_WRITE_LEGACY_IPC_FORMAT:
g_value_set_boolean(value, priv->options.write_legacy_ipc_format);
break;
- case PROP_WRITE_OPTIONS_COMPRESSION:
- g_value_set_enum(value, priv->options.compression);
- break;
- case PROP_WRITE_OPTIONS_COMPRESSION_LEVEL:
- g_value_set_int(value, priv->options.compression_level);
+ case PROP_WRITE_OPTIONS_CODEC:
+ g_value_set_object(value, priv->codec);
break;
case PROP_WRITE_OPTIONS_USE_THREADS:
g_value_set_boolean(value, priv->options.use_threads);
@@ -352,6 +363,11 @@ garrow_write_options_init(GArrowWriteOptions *object)
auto priv = GARROW_WRITE_OPTIONS_GET_PRIVATE(object);
new(&priv->options) arrow::ipc::IpcWriteOptions;
priv->options = arrow::ipc::IpcWriteOptions::Defaults();
+ if (priv->options.codec) {
+ priv->codec = garrow_codec_new_raw(&(priv->options.codec));
+ } else {
+ priv->codec = NULL;
+ }
}
static void
@@ -359,6 +375,7 @@ garrow_write_options_class_init(GArrowWriteOptionsClass
*klass)
{
auto gobject_class = G_OBJECT_CLASS(klass);
+ gobject_class->dispose = garrow_write_options_dispose;
gobject_class->finalize = garrow_write_options_finalize;
gobject_class->set_property = garrow_write_options_set_property;
gobject_class->get_property = garrow_write_options_get_property;
@@ -441,42 +458,24 @@ garrow_write_options_class_init(GArrowWriteOptionsClass
*klass)
spec);
/**
- * GArrowWriteOptions:compression:
+ * GArrowWriteOptions:codec:
*
* Codec to use for compressing and decompressing record batch body
* buffers. This is not part of the Arrow IPC protocol and only for
- * internal use (e.g. Feather files). May only be LZ4_FRAME and
- * ZSTD.
+ * internal use (e.g. Feather files).
*
- * Since: 1.0.0
- */
- spec = g_param_spec_enum("compression",
- "Compression",
- "Codec to use for "
- "compressing record batch body buffers.",
- GARROW_TYPE_COMPRESSION_TYPE,
- options.compression,
- static_cast<GParamFlags>(G_PARAM_READWRITE));
- g_object_class_install_property(gobject_class,
- PROP_WRITE_OPTIONS_COMPRESSION,
- spec);
-
- /**
- * GArrowWriteOptions:compression-level:
- *
- * The level for compression.
+ * May only be UNCOMPRESSED, LZ4_FRAME and ZSTD.
*
- * Since: 1.0.0
+ * Since: 2.0.0
*/
- spec = g_param_spec_int("compression-level",
- "Compression level",
- "The level for compression",
- G_MININT,
- G_MAXINT,
- options.compression_level,
- static_cast<GParamFlags>(G_PARAM_READWRITE));
+ spec = g_param_spec_object("codec",
+ "Codec",
+ "Codec to use for "
+ "compressing record batch body buffers.",
+ GARROW_TYPE_CODEC,
+ static_cast<GParamFlags>(G_PARAM_READWRITE));
g_object_class_install_property(gobject_class,
- PROP_WRITE_OPTIONS_COMPRESSION_LEVEL,
+ PROP_WRITE_OPTIONS_CODEC,
spec);
/**
diff --git a/c_glib/arrow-glib/output-stream.cpp
b/c_glib/arrow-glib/output-stream.cpp
index 2c3ccaf..1619bac 100644
--- a/c_glib/arrow-glib/output-stream.cpp
+++ b/c_glib/arrow-glib/output-stream.cpp
@@ -688,7 +688,7 @@ garrow_compressed_output_stream_new(GArrowCodec *codec,
GArrowOutputStream *raw,
GError **error)
{
- auto arrow_codec = garrow_codec_get_raw(codec);
+ auto arrow_codec = garrow_codec_get_raw(codec).get();
auto arrow_raw = garrow_output_stream_get_raw(raw);
auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec,
arrow_raw);
diff --git a/c_glib/test/test-codec.rb b/c_glib/test/test-codec.rb
index 6617815..a32ec4d 100644
--- a/c_glib/test/test-codec.rb
+++ b/c_glib/test/test-codec.rb
@@ -20,4 +20,14 @@ class TestCodec < Test::Unit::TestCase
codec = Arrow::Codec.new(:gzip)
assert_equal("gzip", codec.name)
end
+
+ def test_compression_type
+ codec = Arrow::Codec.new(:gzip)
+ assert_equal(Arrow::CompressionType::GZIP, codec.compression_type)
+ end
+
+ def test_compression_level
+ codec = Arrow::Codec.new(:gzip)
+ assert_equal(9, codec.compression_level)
+ end
end
diff --git a/c_glib/test/test-write-options.rb
b/c_glib/test/test-write-options.rb
index d30b78b..c528ce6 100644
--- a/c_glib/test/test-write-options.rb
+++ b/c_glib/test/test-write-options.rb
@@ -73,27 +73,15 @@ class TestWriteOptions < Test::Unit::TestCase
end
end
- sub_test_case("compression") do
+ sub_test_case("codec") do
def test_default
- assert_equal(Arrow::CompressionType::UNCOMPRESSED,
- @options.compression)
+ assert_nil(@options.codec)
end
def test_accessor
- @options.compression = :zstd
- assert_equal(Arrow::CompressionType::ZSTD,
- @options.compression)
- end
- end
-
- sub_test_case("compression-level") do
- def test_default
- assert_equal(-(2 ** 31), @options.compression_level)
- end
-
- def test_accessor
- @options.compression_level = 8
- assert_equal(8, @options.compression_level)
+ @options.codec = Arrow::Codec.new(:zstd)
+ assert_equal("zstd",
+ @options.codec.name)
end
end
diff --git a/cpp/src/arrow/dataset/file_ipc.cc
b/cpp/src/arrow/dataset/file_ipc.cc
index e15de84..8bd0121 100644
--- a/cpp/src/arrow/dataset/file_ipc.cc
+++ b/cpp/src/arrow/dataset/file_ipc.cc
@@ -168,12 +168,12 @@ Result<ScanTaskIterator>
IpcFileFormat::ScanFile(std::shared_ptr<ScanOptions> op
//
std::shared_ptr<FileWriteOptions> IpcFileFormat::DefaultWriteOptions() {
- std::shared_ptr<IpcFileWriteOptions> options(
+ std::shared_ptr<IpcFileWriteOptions> ipc_options(
new IpcFileWriteOptions(shared_from_this()));
- options->ipc_options =
+ ipc_options->options =
std::make_shared<ipc::IpcWriteOptions>(ipc::IpcWriteOptions::Defaults());
- return options;
+ return ipc_options;
}
Result<std::shared_ptr<FileWriter>> IpcFileFormat::MakeWriter(
@@ -185,7 +185,13 @@ Result<std::shared_ptr<FileWriter>>
IpcFileFormat::MakeWriter(
auto ipc_options = checked_pointer_cast<IpcFileWriteOptions>(options);
- ARROW_ASSIGN_OR_RAISE(auto writer, ipc::MakeFileWriter(destination, schema));
+ // override use_threads to avoid nested parallelism
+ ipc_options->options->use_threads = false;
+
+ ARROW_ASSIGN_OR_RAISE(auto writer,
+ ipc::MakeFileWriter(destination, schema,
*ipc_options->options,
+ ipc_options->metadata));
+
return std::shared_ptr<FileWriter>(
new IpcFileWriter(std::move(writer), std::move(schema),
std::move(ipc_options)));
}
diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h
index 50650bf..2cdd837 100644
--- a/cpp/src/arrow/dataset/file_ipc.h
+++ b/cpp/src/arrow/dataset/file_ipc.h
@@ -66,7 +66,11 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions {
public:
- std::shared_ptr<ipc::IpcWriteOptions> ipc_options;
+ /// Options passed to ipc::MakeFileWriter. use_threads is ignored
+ std::shared_ptr<ipc::IpcWriteOptions> options;
+
+ /// custom_metadata written to the file's footer
+ std::shared_ptr<const KeyValueMetadata> metadata;
protected:
using FileWriteOptions::FileWriteOptions;
diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc
b/cpp/src/arrow/dataset/file_ipc_test.cc
index 808bae5..25bbe55 100644
--- a/cpp/src/arrow/dataset/file_ipc_test.cc
+++ b/cpp/src/arrow/dataset/file_ipc_test.cc
@@ -28,11 +28,13 @@
#include "arrow/dataset/partition.h"
#include "arrow/dataset/test_util.h"
#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/util.h"
+#include "arrow/util/key_value_metadata.h"
namespace arrow {
namespace dataset {
@@ -166,6 +168,35 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) {
AssertBufferEqual(*written, *source->buffer());
}
+TEST_F(TestIpcFileFormat, WriteRecordBatchReaderCustomOptions) {
+ std::shared_ptr<RecordBatchReader> reader = GetRecordBatchReader();
+ auto source = GetFileSource(reader.get());
+ reader = GetRecordBatchReader();
+
+ opts_ = ScanOptions::Make(reader->schema());
+
+ EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink());
+
+ auto ipc_options =
+
checked_pointer_cast<IpcFileWriteOptions>(format_->DefaultWriteOptions());
+ if (util::Codec::IsAvailable(Compression::ZSTD)) {
+ EXPECT_OK_AND_ASSIGN(ipc_options->options->codec,
+ util::Codec::Create(Compression::ZSTD));
+ }
+ ipc_options->metadata = key_value_metadata({{"hello", "world"}});
+ EXPECT_OK_AND_ASSIGN(auto writer,
+ format_->MakeWriter(sink, reader->schema(),
ipc_options));
+ ASSERT_OK(writer->Write(reader.get()));
+ ASSERT_OK(writer->Finish());
+
+ EXPECT_OK_AND_ASSIGN(auto written, sink->Finish());
+ EXPECT_OK_AND_ASSIGN(auto ipc_reader, ipc::RecordBatchFileReader::Open(
+
std::make_shared<io::BufferReader>(written)));
+
+ EXPECT_EQ(ipc_reader->metadata()->sorted_pairs(),
+ ipc_options->metadata->sorted_pairs());
+}
+
class TestIpcFileSystemDataset : public testing::Test,
public WriteFileSystemDatasetMixin {
public:
diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc
index 017e545..0194160 100644
--- a/cpp/src/arrow/dataset/scanner.cc
+++ b/cpp/src/arrow/dataset/scanner.cc
@@ -114,7 +114,7 @@ Status ScannerBuilder::Project(std::vector<std::string>
columns) {
Status ScannerBuilder::Filter(std::shared_ptr<Expression> filter) {
RETURN_NOT_OK(schema()->CanReferenceFieldsByNames(FieldsInExpression(*filter)));
- RETURN_NOT_OK(filter->Validate(*schema()).status());
+ RETURN_NOT_OK(filter->Validate(*schema()));
scan_options_->filter = std::move(filter);
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index 5aa4601..5ce8885 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -795,8 +795,9 @@ Status WriteTable(const Table& table, io::OutputStream* dst,
return WriteFeatherV1(table, dst);
} else {
IpcWriteOptions ipc_options = IpcWriteOptions::Defaults();
- ipc_options.compression = properties.compression;
- ipc_options.compression_level = properties.compression_level;
+ ARROW_ASSIGN_OR_RAISE(
+ ipc_options.codec,
+ util::Codec::Create(properties.compression,
properties.compression_level));
std::shared_ptr<RecordBatchWriter> writer;
ARROW_ASSIGN_OR_RAISE(writer, MakeFileWriter(dst, table.schema(),
ipc_options));
diff --git a/cpp/src/arrow/ipc/metadata_internal.cc
b/cpp/src/arrow/ipc/metadata_internal.cc
index 9c967a5..a82aef3 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -903,15 +903,15 @@ static Status WriteBuffers(FBB& fbb, const
std::vector<BufferMetadata>& buffers,
static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
BodyCompressionOffset* out) {
- if (options.compression != Compression::UNCOMPRESSED) {
+ if (options.codec != nullptr) {
flatbuf::CompressionType codec;
- if (options.compression == Compression::LZ4_FRAME) {
+ if (options.codec->compression_type() == Compression::LZ4_FRAME) {
codec = flatbuf::CompressionType::LZ4_FRAME;
- } else if (options.compression == Compression::ZSTD) {
+ } else if (options.codec->compression_type() == Compression::ZSTD) {
codec = flatbuf::CompressionType::ZSTD;
} else {
return Status::Invalid("Unsupported IPC compression codec: ",
-
util::Codec::GetCodecAsString(options.compression));
+ options.codec->name());
}
*out = flatbuf::CreateBodyCompression(fbb, codec,
flatbuf::BodyCompressionMethod::BUFFER);
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index 6bbd7b8..bf535cd 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -59,8 +59,7 @@ struct ARROW_EXPORT IpcWriteOptions {
/// \brief Compression codec to use for record batch body buffers
///
/// May only be UNCOMPRESSED, LZ4_FRAME and ZSTD.
- Compression::type compression = Compression::UNCOMPRESSED;
- int compression_level = Compression::kUseDefaultCompressionLevel;
+ std::shared_ptr<util::Codec> codec;
/// \brief Use global CPU thread pool to parallelize any computational tasks
/// like compression
diff --git a/cpp/src/arrow/ipc/read_write_test.cc
b/cpp/src/arrow/ipc/read_write_test.cc
index 5d11c27..85f6e39 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -619,7 +619,7 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
continue;
}
IpcWriteOptions write_options = IpcWriteOptions::Defaults();
- write_options.compression = codec;
+ ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec));
CheckRoundtrip(*batch, write_options);
// Check non-parallel read and write
@@ -636,9 +636,9 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
if (!util::Codec::IsAvailable(codec)) {
continue;
}
- IpcWriteOptions options = IpcWriteOptions::Defaults();
- options.compression = codec;
- ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, options));
+ IpcWriteOptions write_options = IpcWriteOptions::Defaults();
+ ASSERT_OK_AND_ASSIGN(write_options.codec, util::Codec::Create(codec));
+ ASSERT_RAISES(Invalid, SerializeRecordBatch(*batch, write_options));
}
}
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 9f9be32..adce911 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -185,17 +185,13 @@ class RecordBatchSerializer {
}
Status CompressBodyBuffers() {
- std::unique_ptr<util::Codec> codec;
-
- RETURN_NOT_OK(internal::CheckCompressionSupported(options_.compression));
-
- ARROW_ASSIGN_OR_RAISE(
- codec, util::Codec::Create(options_.compression,
options_.compression_level));
+ RETURN_NOT_OK(
+
internal::CheckCompressionSupported(options_.codec->compression_type()));
auto CompressOne = [&](size_t i) {
if (out_->body_buffers[i]->size() > 0) {
- RETURN_NOT_OK(
- CompressBuffer(*out_->body_buffers[i], codec.get(),
&out_->body_buffers[i]));
+ RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i],
options_.codec.get(),
+ &out_->body_buffers[i]));
}
return Status::OK();
};
@@ -216,7 +212,7 @@ class RecordBatchSerializer {
RETURN_NOT_OK(VisitArray(*batch.column(i)));
}
- if (options_.compression != Compression::UNCOMPRESSED) {
+ if (options_.codec != nullptr) {
RETURN_NOT_OK(CompressBodyBuffers());
}
@@ -227,8 +223,7 @@ class RecordBatchSerializer {
buffer_meta_.reserve(out_->body_buffers.size());
// Construct the buffer metadata for the record batch header
- for (size_t i = 0; i < out_->body_buffers.size(); ++i) {
- const Buffer* buffer = out_->body_buffers[i].get();
+ for (const auto& buffer : out_->body_buffers) {
int64_t size = 0;
int64_t padding = 0;
diff --git a/cpp/src/arrow/testing/gtest_util.h
b/cpp/src/arrow/testing/gtest_util.h
index feba8ba..fe90c37 100644
--- a/cpp/src/arrow/testing/gtest_util.h
+++ b/cpp/src/arrow/testing/gtest_util.h
@@ -464,3 +464,15 @@ class ARROW_TESTING_EXPORT EnvVarGuard {
#endif
} // namespace arrow
+
+namespace nonstd {
+namespace sv_lite {
+
+// Without this hint, GTest will print string_views as a container of char
+template <class Char, class Traits = std::char_traits<Char>>
+void PrintTo(const basic_string_view<Char, Traits>& view, std::ostream* os) {
+ *os << view;
+}
+
+} // namespace sv_lite
+} // namespace nonstd
diff --git a/cpp/src/arrow/util/compression.cc
b/cpp/src/arrow/util/compression.cc
index d05bd0f..f9c084f 100644
--- a/cpp/src/arrow/util/compression.cc
+++ b/cpp/src/arrow/util/compression.cc
@@ -24,160 +24,156 @@
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/compression_internal.h"
+#include "arrow/util/logging.h"
namespace arrow {
namespace util {
-Compressor::~Compressor() {}
-
-Decompressor::~Decompressor() {}
-
-Codec::~Codec() {}
-
int Codec::UseDefaultCompressionLevel() { return kUseDefaultCompressionLevel; }
Status Codec::Init() { return Status::OK(); }
-std::string Codec::GetCodecAsString(Compression::type t) {
+const std::string& Codec::GetCodecAsString(Compression::type t) {
+ static const std::string uncompressed = "uncompressed", snappy = "snappy",
+ gzip = "gzip", lzo = "lzo", brotli = "brotli",
+ lz4_raw = "lz4_raw", lz4 = "lz4", lz4_hadoop =
"lz4_hadoop",
+ zstd = "zstd", bz2 = "bz2", unknown = "unknown";
+
switch (t) {
case Compression::UNCOMPRESSED:
- return "UNCOMPRESSED";
+ return uncompressed;
case Compression::SNAPPY:
- return "SNAPPY";
+ return snappy;
case Compression::GZIP:
- return "GZIP";
+ return gzip;
case Compression::LZO:
- return "LZO";
+ return lzo;
case Compression::BROTLI:
- return "BROTLI";
+ return brotli;
case Compression::LZ4:
- return "LZ4_RAW";
+ return lz4_raw;
case Compression::LZ4_FRAME:
- return "LZ4";
+ return lz4;
case Compression::LZ4_HADOOP:
- return "LZ4_HADOOP";
+ return lz4_hadoop;
case Compression::ZSTD:
- return "ZSTD";
+ return zstd;
case Compression::BZ2:
- return "BZ2";
+ return bz2;
default:
- return "UNKNOWN";
+ return unknown;
}
}
Result<Compression::type> Codec::GetCompressionType(const std::string& name) {
- if (name == "UNCOMPRESSED") {
+ if (name == "uncompressed") {
return Compression::UNCOMPRESSED;
- } else if (name == "GZIP") {
+ } else if (name == "gzip") {
return Compression::GZIP;
- } else if (name == "SNAPPY") {
+ } else if (name == "snappy") {
return Compression::SNAPPY;
- } else if (name == "LZO") {
+ } else if (name == "lzo") {
return Compression::LZO;
- } else if (name == "BROTLI") {
+ } else if (name == "brotli") {
return Compression::BROTLI;
- } else if (name == "LZ4_RAW") {
+ } else if (name == "lz4_raw") {
return Compression::LZ4;
- } else if (name == "LZ4") {
+ } else if (name == "lz4") {
return Compression::LZ4_FRAME;
- } else if (name == "LZ4_HADOOP") {
+ } else if (name == "lz4_hadoop") {
return Compression::LZ4_HADOOP;
- } else if (name == "ZSTD") {
+ } else if (name == "zstd") {
return Compression::ZSTD;
- } else if (name == "BZ2") {
+ } else if (name == "bz2") {
return Compression::BZ2;
} else {
return Status::Invalid("Unrecognized compression type: ", name);
}
}
+bool Codec::SupportsCompressionLevel(Compression::type codec) {
+ switch (codec) {
+ case Compression::GZIP:
+ case Compression::BROTLI:
+ case Compression::ZSTD:
+ case Compression::BZ2:
+ return true;
+ default:
+ return false;
+ }
+}
+
Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
int compression_level) {
+ if (!IsAvailable(codec_type)) {
+ if (codec_type == Compression::LZO) {
+ return Status::NotImplemented("LZO codec not implemented");
+ }
+
+ auto name = GetCodecAsString(codec_type);
+ if (name == "unknown") {
+ return Status::Invalid("Unrecognized codec");
+ }
+
+ return Status::NotImplemented("Support for codec '",
GetCodecAsString(codec_type),
+ "' not built");
+ }
+
+ if (compression_level != kUseDefaultCompressionLevel &&
+ !SupportsCompressionLevel(codec_type)) {
+ return Status::Invalid("Codec '", GetCodecAsString(codec_type),
+ "' doesn't support setting a compression level.");
+ }
+
std::unique_ptr<Codec> codec;
- const bool compression_level_set{compression_level !=
kUseDefaultCompressionLevel};
switch (codec_type) {
case Compression::UNCOMPRESSED:
- if (compression_level_set) {
- return Status::Invalid("Compression level cannot be specified for
UNCOMPRESSED.");
- }
return nullptr;
case Compression::SNAPPY:
#ifdef ARROW_WITH_SNAPPY
- if (compression_level_set) {
- return Status::Invalid("Snappy doesn't support setting a compression
level.");
- }
codec = internal::MakeSnappyCodec();
- break;
-#else
- return Status::NotImplemented("Snappy codec support not built");
#endif
+ break;
case Compression::GZIP:
#ifdef ARROW_WITH_ZLIB
codec = internal::MakeGZipCodec(compression_level);
- break;
-#else
- return Status::NotImplemented("Gzip codec support not built");
#endif
- case Compression::LZO:
- if (compression_level_set) {
- return Status::Invalid("LZ0 doesn't support setting a compression
level.");
- }
- return Status::NotImplemented("LZO codec not implemented");
+ break;
case Compression::BROTLI:
#ifdef ARROW_WITH_BROTLI
codec = internal::MakeBrotliCodec(compression_level);
- break;
-#else
- return Status::NotImplemented("Brotli codec support not built");
#endif
+ break;
case Compression::LZ4:
#ifdef ARROW_WITH_LZ4
- if (compression_level_set) {
- return Status::Invalid("LZ4 doesn't support setting a compression
level.");
- }
codec = internal::MakeLz4RawCodec();
- break;
-#else
- return Status::NotImplemented("LZ4 codec support not built");
#endif
+ break;
case Compression::LZ4_FRAME:
#ifdef ARROW_WITH_LZ4
- if (compression_level_set) {
- return Status::Invalid("LZ4 doesn't support setting a compression
level.");
- }
codec = internal::MakeLz4FrameCodec();
- break;
-#else
- return Status::NotImplemented("LZ4 codec support not built");
#endif
+ break;
case Compression::LZ4_HADOOP:
#ifdef ARROW_WITH_LZ4
- if (compression_level_set) {
- return Status::Invalid("LZ4 doesn't support setting a compression
level.");
- }
codec = internal::MakeLz4HadoopRawCodec();
- break;
-#else
- return Status::NotImplemented("LZ4 codec support not built");
#endif
+ break;
case Compression::ZSTD:
#ifdef ARROW_WITH_ZSTD
codec = internal::MakeZSTDCodec(compression_level);
- break;
-#else
- return Status::NotImplemented("ZSTD codec support not built");
#endif
+ break;
case Compression::BZ2:
#ifdef ARROW_WITH_BZ2
codec = internal::MakeBZ2Codec(compression_level);
- break;
-#else
- return Status::NotImplemented("BZ2 codec support not built");
#endif
+ break;
default:
- return Status::Invalid("Unrecognized codec");
+ break;
}
+ DCHECK_NE(codec, nullptr);
RETURN_NOT_OK(codec->Init());
return std::move(codec);
}
diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index 551ecbe..d3e8e1e 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -54,7 +54,7 @@ constexpr int kUseDefaultCompressionLevel =
Compression::kUseDefaultCompressionL
///
class ARROW_EXPORT Compressor {
public:
- virtual ~Compressor();
+ virtual ~Compressor() = default;
struct CompressResult {
int64_t bytes_read;
@@ -96,7 +96,7 @@ class ARROW_EXPORT Compressor {
///
class ARROW_EXPORT Decompressor {
public:
- virtual ~Decompressor();
+ virtual ~Decompressor() = default;
struct DecompressResult {
// XXX is need_more_output necessary? (Brotli?)
@@ -128,14 +128,14 @@ class ARROW_EXPORT Decompressor {
/// \brief Compression codec
class ARROW_EXPORT Codec {
public:
- virtual ~Codec();
+ virtual ~Codec() = default;
/// \brief Return special value to indicate that a codec implementation
/// should use its default compression level
static int UseDefaultCompressionLevel();
/// \brief Return a string name for compression type
- static std::string GetCodecAsString(Compression::type t);
+ static const std::string& GetCodecAsString(Compression::type t);
/// \brief Return compression type for name (all upper case)
static Result<Compression::type> GetCompressionType(const std::string& name);
@@ -147,6 +147,9 @@ class ARROW_EXPORT Codec {
/// \brief Return true if support for indicated codec has been enabled
static bool IsAvailable(Compression::type codec);
+ /// \brief Return true if indicated codec supports setting a compression
level
+ static bool SupportsCompressionLevel(Compression::type codec);
+
/// \brief One-shot decompression function
///
/// output_buffer_len must be correct and therefore be obtained in advance.
@@ -178,7 +181,14 @@ class ARROW_EXPORT Codec {
/// \brief Create a streaming compressor instance
virtual Result<std::shared_ptr<Decompressor>> MakeDecompressor() = 0;
- virtual const char* name() const = 0;
+ /// \brief This Codec's compression type
+ virtual Compression::type compression_type() const = 0;
+
+ /// \brief The name of this Codec's compression type
+ const std::string& name() const { return
GetCodecAsString(compression_type()); }
+
+ /// \brief This Codec's compression level, if applicable
+ virtual int compression_level() const { return UseDefaultCompressionLevel();
}
private:
/// \brief Initializes the codec's resources.
diff --git a/cpp/src/arrow/util/compression_brotli.cc
b/cpp/src/arrow/util/compression_brotli.cc
index ca4f523..4feabe2 100644
--- a/cpp/src/arrow/util/compression_brotli.cc
+++ b/cpp/src/arrow/util/compression_brotli.cc
@@ -38,8 +38,6 @@ namespace {
class BrotliDecompressor : public Decompressor {
public:
- BrotliDecompressor() {}
-
~BrotliDecompressor() override {
if (state_ != nullptr) {
BrotliDecoderDestroyInstance(state_);
@@ -167,7 +165,7 @@ class BrotliCompressor : public Compressor {
BrotliEncoderState* state_ = nullptr;
private:
- int compression_level_;
+ const int compression_level_;
};
// ----------------------------------------------------------------------
@@ -175,11 +173,10 @@ class BrotliCompressor : public Compressor {
class BrotliCodec : public Codec {
public:
- explicit BrotliCodec(int compression_level) {
- compression_level_ = compression_level == kUseDefaultCompressionLevel
- ? kBrotliDefaultCompressionLevel
- : compression_level;
- }
+ explicit BrotliCodec(int compression_level)
+ : compression_level_(compression_level == kUseDefaultCompressionLevel
+ ? kBrotliDefaultCompressionLevel
+ : compression_level) {}
Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t*
output_buffer) override {
@@ -224,10 +221,12 @@ class BrotliCodec : public Codec {
return ptr;
}
- const char* name() const override { return "brotli"; }
+ Compression::type compression_type() const override { return
Compression::BROTLI; }
+
+ int compression_level() const override { return compression_level_; }
private:
- int compression_level_;
+ const int compression_level_;
};
} // namespace
diff --git a/cpp/src/arrow/util/compression_bz2.cc
b/cpp/src/arrow/util/compression_bz2.cc
index dfb266a..8a8c1cb 100644
--- a/cpp/src/arrow/util/compression_bz2.cc
+++ b/cpp/src/arrow/util/compression_bz2.cc
@@ -262,7 +262,9 @@ class BZ2Codec : public Codec {
return ptr;
}
- const char* name() const override { return "bz2"; }
+ Compression::type compression_type() const override { return
Compression::BZ2; }
+
+ int compression_level() const override { return compression_level_; }
private:
int compression_level_;
diff --git a/cpp/src/arrow/util/compression_lz4.cc
b/cpp/src/arrow/util/compression_lz4.cc
index 17a8514..365cd0f 100644
--- a/cpp/src/arrow/util/compression_lz4.cc
+++ b/cpp/src/arrow/util/compression_lz4.cc
@@ -298,10 +298,10 @@ class Lz4FrameCodec : public Codec {
return ptr;
}
- const char* name() const override { return "lz4"; }
+ Compression::type compression_type() const override { return
Compression::LZ4_FRAME; }
protected:
- LZ4F_preferences_t prefs_;
+ const LZ4F_preferences_t prefs_;
};
// ----------------------------------------------------------------------
@@ -348,7 +348,7 @@ class Lz4Codec : public Codec {
"Try using LZ4 frame format instead.");
}
- const char* name() const override { return "lz4_raw"; }
+ Compression::type compression_type() const override { return
Compression::LZ4; }
};
// ----------------------------------------------------------------------
@@ -407,7 +407,7 @@ class Lz4HadoopCodec : public Lz4Codec {
"Try using LZ4 frame format instead.");
}
- const char* name() const override { return "lz4_hadoop_raw"; }
+ Compression::type compression_type() const override { return
Compression::LZ4_HADOOP; }
protected:
// Offset starting at which page data can be read/written
diff --git a/cpp/src/arrow/util/compression_snappy.cc
b/cpp/src/arrow/util/compression_snappy.cc
index 9cd06dc..9b01687 100644
--- a/cpp/src/arrow/util/compression_snappy.cc
+++ b/cpp/src/arrow/util/compression_snappy.cc
@@ -41,8 +41,6 @@ namespace {
class SnappyCodec : public Codec {
public:
- SnappyCodec() {}
-
Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t*
output_buffer) override {
size_t decompressed_size;
@@ -87,7 +85,7 @@ class SnappyCodec : public Codec {
return Status::NotImplemented("Streaming decompression unsupported with
Snappy");
}
- const char* name() const override { return "snappy"; }
+ Compression::type compression_type() const override { return
Compression::SNAPPY; }
};
} // namespace
diff --git a/cpp/src/arrow/util/compression_test.cc
b/cpp/src/arrow/util/compression_test.cc
index 1df5236..8b184f2 100644
--- a/cpp/src/arrow/util/compression_test.cc
+++ b/cpp/src/arrow/util/compression_test.cc
@@ -320,30 +320,30 @@ class CodecTest : public
::testing::TestWithParam<Compression::type> {
};
TEST(TestCodecMisc, GetCodecAsString) {
- ASSERT_EQ("UNCOMPRESSED",
Codec::GetCodecAsString(Compression::UNCOMPRESSED));
- ASSERT_EQ("SNAPPY", Codec::GetCodecAsString(Compression::SNAPPY));
- ASSERT_EQ("GZIP", Codec::GetCodecAsString(Compression::GZIP));
- ASSERT_EQ("LZO", Codec::GetCodecAsString(Compression::LZO));
- ASSERT_EQ("BROTLI", Codec::GetCodecAsString(Compression::BROTLI));
- ASSERT_EQ("LZ4_RAW", Codec::GetCodecAsString(Compression::LZ4));
- ASSERT_EQ("LZ4", Codec::GetCodecAsString(Compression::LZ4_FRAME));
- ASSERT_EQ("ZSTD", Codec::GetCodecAsString(Compression::ZSTD));
- ASSERT_EQ("BZ2", Codec::GetCodecAsString(Compression::BZ2));
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::UNCOMPRESSED),
"uncompressed");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::SNAPPY), "snappy");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::GZIP), "gzip");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::LZO), "lzo");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::BROTLI), "brotli");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4), "lz4_raw");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::LZ4_FRAME), "lz4");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::ZSTD), "zstd");
+ EXPECT_EQ(Codec::GetCodecAsString(Compression::BZ2), "bz2");
}
TEST(TestCodecMisc, GetCompressionType) {
- ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED,
Codec::GetCompressionType("UNCOMPRESSED"));
- ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("SNAPPY"));
- ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("GZIP"));
- ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("LZO"));
- ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("BROTLI"));
- ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("LZ4_RAW"));
- ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("LZ4"));
- ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("ZSTD"));
- ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("BZ2"));
+ ASSERT_OK_AND_EQ(Compression::UNCOMPRESSED,
Codec::GetCompressionType("uncompressed"));
+ ASSERT_OK_AND_EQ(Compression::SNAPPY, Codec::GetCompressionType("snappy"));
+ ASSERT_OK_AND_EQ(Compression::GZIP, Codec::GetCompressionType("gzip"));
+ ASSERT_OK_AND_EQ(Compression::LZO, Codec::GetCompressionType("lzo"));
+ ASSERT_OK_AND_EQ(Compression::BROTLI, Codec::GetCompressionType("brotli"));
+ ASSERT_OK_AND_EQ(Compression::LZ4, Codec::GetCompressionType("lz4_raw"));
+ ASSERT_OK_AND_EQ(Compression::LZ4_FRAME, Codec::GetCompressionType("lz4"));
+ ASSERT_OK_AND_EQ(Compression::ZSTD, Codec::GetCompressionType("zstd"));
+ ASSERT_OK_AND_EQ(Compression::BZ2, Codec::GetCompressionType("bz2"));
ASSERT_RAISES(Invalid, Codec::GetCompressionType("unk"));
- ASSERT_RAISES(Invalid, Codec::GetCompressionType("snappy"));
+ ASSERT_RAISES(Invalid, Codec::GetCompressionType("SNAPPY"));
}
TEST_P(CodecTest, CodecRoundtrip) {
diff --git a/cpp/src/arrow/util/compression_zlib.cc
b/cpp/src/arrow/util/compression_zlib.cc
index 717bbc6..84e517e 100644
--- a/cpp/src/arrow/util/compression_zlib.cc
+++ b/cpp/src/arrow/util/compression_zlib.cc
@@ -463,7 +463,9 @@ class GZipCodec : public Codec {
return InitDecompressor();
}
- const char* name() const override { return "gzip"; }
+ Compression::type compression_type() const override { return
Compression::GZIP; }
+
+ int compression_level() const override { return compression_level_; }
private:
// zlib is stateful and the z_stream state variable must be initialized
diff --git a/cpp/src/arrow/util/compression_zstd.cc
b/cpp/src/arrow/util/compression_zstd.cc
index e740960..382e057 100644
--- a/cpp/src/arrow/util/compression_zstd.cc
+++ b/cpp/src/arrow/util/compression_zstd.cc
@@ -173,20 +173,19 @@ class ZSTDCompressor : public Compressor {
class ZSTDCodec : public Codec {
public:
- explicit ZSTDCodec(int compression_level) {
- compression_level_ = compression_level == kUseDefaultCompressionLevel
- ? kZSTDDefaultCompressionLevel
- : compression_level;
- }
+ explicit ZSTDCodec(int compression_level)
+ : compression_level_(compression_level == kUseDefaultCompressionLevel
+ ? kZSTDDefaultCompressionLevel
+ : compression_level) {}
Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t*
output_buffer) override {
if (output_buffer == nullptr) {
// We may pass a NULL 0-byte output buffer but some zstd versions demand
// a valid pointer: https://github.com/facebook/zstd/issues/1385
- static uint8_t empty_buffer[1];
+ static uint8_t empty_buffer;
DCHECK_EQ(output_buffer_len, 0);
- output_buffer = empty_buffer;
+ output_buffer = &empty_buffer;
}
size_t ret = ZSTD_decompress(output_buffer,
static_cast<size_t>(output_buffer_len),
@@ -228,10 +227,12 @@ class ZSTDCodec : public Codec {
return ptr;
}
- const char* name() const override { return "zstd"; }
+ Compression::type compression_type() const override { return
Compression::ZSTD; }
+
+ int compression_level() const override { return compression_level_; }
private:
- int compression_level_;
+ const int compression_level_;
};
} // namespace
diff --git a/cpp/src/arrow/util/key_value_metadata.h
b/cpp/src/arrow/util/key_value_metadata.h
index 2f64522..d4207a5 100644
--- a/cpp/src/arrow/util/key_value_metadata.h
+++ b/cpp/src/arrow/util/key_value_metadata.h
@@ -51,10 +51,13 @@ class ARROW_EXPORT KeyValueMetadata {
Status Set(const std::string& key, const std::string& value);
void reserve(int64_t n);
- int64_t size() const;
+ int64_t size() const;
const std::string& key(int64_t i) const;
const std::string& value(int64_t i) const;
+ const std::vector<std::string>& keys() const { return keys_; }
+ const std::vector<std::string>& values() const { return values_; }
+
std::vector<std::pair<std::string, std::string>> sorted_pairs() const;
/// \brief Perform linear search for key, returning -1 if not found
diff --git a/cpp/src/arrow/util/string.cc b/cpp/src/arrow/util/string.cc
index 625086c..691f10e 100644
--- a/cpp/src/arrow/util/string.cc
+++ b/cpp/src/arrow/util/string.cc
@@ -144,6 +144,14 @@ std::string AsciiToLower(util::string_view value) {
return result;
}
+std::string AsciiToUpper(util::string_view value) {
+ // TODO: ASCII validation
+ std::string result = std::string(value);
+ std::transform(result.begin(), result.end(), result.begin(),
+ [](unsigned char c) { return std::toupper(c); });
+ return result;
+}
+
util::optional<std::string> Replace(util::string_view s, util::string_view
token,
util::string_view replacement) {
size_t token_start = s.find(token);
diff --git a/cpp/src/arrow/util/string.h b/cpp/src/arrow/util/string.h
index 56a02df..feb75d4 100644
--- a/cpp/src/arrow/util/string.h
+++ b/cpp/src/arrow/util/string.h
@@ -57,6 +57,9 @@ bool AsciiEqualsCaseInsensitive(util::string_view left,
util::string_view right)
ARROW_EXPORT
std::string AsciiToLower(util::string_view value);
+ARROW_EXPORT
+std::string AsciiToUpper(util::string_view value);
+
/// \brief Search for the first instance of a token and replace it or return
nullopt if
/// the token is not found.
ARROW_EXPORT
diff --git a/cpp/src/parquet/printer.cc b/cpp/src/parquet/printer.cc
index a667a48..224a19d 100644
--- a/cpp/src/parquet/printer.cc
+++ b/cpp/src/parquet/printer.cc
@@ -25,6 +25,7 @@
#include <vector>
#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/string.h"
#include "parquet/column_scanner.h"
#include "parquet/exception.h"
@@ -121,7 +122,9 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream,
std::list<int> selecte
stream << " Statistics Not Set";
}
stream << std::endl
- << " Compression: " <<
Codec::GetCodecAsString(column_chunk->compression())
+ << " Compression: "
+ << arrow::internal::AsciiToUpper(
+ Codec::GetCodecAsString(column_chunk->compression()))
<< ", Encodings:";
for (auto encoding : column_chunk->encodings()) {
stream << " " << EncodingToString(encoding);
@@ -256,7 +259,8 @@ void ParquetFilePrinter::JSONPrint(std::ostream& stream,
std::list<int> selected
stream << "\"False\",";
}
stream << "\n \"Compression\": \""
- << Codec::GetCodecAsString(column_chunk->compression())
+ << arrow::internal::AsciiToUpper(
+ Codec::GetCodecAsString(column_chunk->compression()))
<< "\", \"Encodings\": \"";
for (auto encoding : column_chunk->encodings()) {
stream << EncodingToString(encoding) << " ";
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index dee022f..afcfacc 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1337,7 +1337,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc"
nogil:
c_bool write_legacy_ipc_format
CMemoryPool* memory_pool
CMetadataVersion metadata_version
- CCompressionType compression
+ shared_ptr[CCodec] codec
c_bool use_threads
@staticmethod
@@ -2055,7 +2055,7 @@ cdef extern from 'arrow/util/compression.h' namespace
'arrow' nogil:
CResult[int64_t] Compress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len,
uint8_t* output_buffer)
- const char* name() const
+ c_string name() const
int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input)
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index a12915e..3fc0984 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -1557,25 +1557,6 @@ cdef CCompressionType _ensure_compression(str name)
except *:
raise ValueError('Invalid value for compression: {!r}'.format(name))
-cdef str _compression_name(CCompressionType ctype):
- if ctype == CCompressionType_GZIP:
- return 'gzip'
- elif ctype == CCompressionType_BROTLI:
- return 'brotli'
- elif ctype == CCompressionType_BZ2:
- return 'bz2'
- elif ctype == CCompressionType_LZ4_FRAME:
- return 'lz4'
- elif ctype == CCompressionType_LZ4:
- return 'lz4_raw'
- elif ctype == CCompressionType_SNAPPY:
- return 'snappy'
- elif ctype == CCompressionType_ZSTD:
- return 'zstd'
- else:
- raise RuntimeError('Unexpected CCompressionType value')
-
-
cdef class Codec(_Weakrefable):
"""
Compression codec.
diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi
index 74a81c6..5c8194d 100644
--- a/python/pyarrow/ipc.pxi
+++ b/python/pyarrow/ipc.pxi
@@ -94,17 +94,18 @@ cdef class IpcWriteOptions(_Weakrefable):
@property
def compression(self):
- if self.c_options.compression == CCompressionType_UNCOMPRESSED:
+ if self.c_options.codec == nullptr:
return None
else:
- return _compression_name(self.c_options.compression)
+ return frombytes(self.c_options.codec.get().name())
@compression.setter
def compression(self, value):
if value is None:
- self.c_options.compression = CCompressionType_UNCOMPRESSED
+ self.c_options.codec.reset()
else:
- self.c_options.compression = _ensure_compression(value)
+ self.c_options.codec = shared_ptr[CCodec](GetResultValue(
+ CCodec.Create(_ensure_compression(value))).release())
@property
def use_threads(self):
diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R
index 7c03732..cf83d69 100644
--- a/r/R/arrowExports.R
+++ b/r/R/arrowExports.R
@@ -432,6 +432,14 @@ dataset___ParquetFileWriteOptions__update <-
function(options, writer_props, arr
invisible(.Call(`_arrow_dataset___ParquetFileWriteOptions__update` ,
options, writer_props, arrow_writer_props))
}
+dataset___IpcFileWriteOptions__update2 <- function(ipc_options,
use_legacy_format, codec, metadata_version){
+ invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update2` ,
ipc_options, use_legacy_format, codec, metadata_version))
+}
+
+dataset___IpcFileWriteOptions__update1 <- function(ipc_options,
use_legacy_format, metadata_version){
+ invisible(.Call(`_arrow_dataset___IpcFileWriteOptions__update1` ,
ipc_options, use_legacy_format, metadata_version))
+}
+
dataset___IpcFileFormat__Make <- function(){
.Call(`_arrow_dataset___IpcFileFormat__Make` )
}
diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R
index ec3ac36..8300e41 100644
--- a/r/R/dataset-format.R
+++ b/r/R/dataset-format.R
@@ -139,6 +139,18 @@ FileWriteOptions <- R6Class("FileWriteOptions", inherit =
ArrowObject,
dataset___ParquetFileWriteOptions__update(self,
ParquetWriterProperties$create(...),
ParquetArrowWriterProperties$create(...))
+ } else if (self$type == "ipc") {
+ args <- list(...)
+ if (is.null(args$codec)) {
+ dataset___IpcFileWriteOptions__update1(self,
+ get_ipc_use_legacy_format(args$use_legacy_format),
+ get_ipc_metadata_version(args$metadata_version))
+ } else {
+ dataset___IpcFileWriteOptions__update2(self,
+ get_ipc_use_legacy_format(args$use_legacy_format),
+ args$codec,
+ get_ipc_metadata_version(args$metadata_version))
+ }
}
invisible(self)
}
diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R
index b97a4e3..c5c9292 100644
--- a/r/R/dataset-write.R
+++ b/r/R/dataset-write.R
@@ -42,7 +42,16 @@
#' @param hive_style logical: write partition segments as Hive-style
#' (`key1=value1/key2=value2/file.ext`) or as just bare values. Default is
`TRUE`.
#' @param ... additional format-specific arguments. For available Parquet
-#' options, see [write_parquet()].
+#' options, see [write_parquet()]. The available Feather options are
+#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
+#' versions 0.14 and lower can read it. Default is `FALSE`. You can also
+#' enable this by setting the environment variable
`ARROW_PRE_0_15_IPC_FORMAT=1`.
+#' - `metadata_version`: A string like "V5" or the equivalent integer
indicating
+#' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
+#' unless the environment variable `ARROW_PRE_1_0_METADATA_VERSION=1`, in
+#' which case it will be V4.
+#' - `codec`: A [Codec] which will be used to compress body buffers of written
+#' files. Default (NULL) will not compress body buffers.
#' @return The input `dataset`, invisibly
#' @export
write_dataset <- function(dataset,
diff --git a/r/R/record-batch-writer.R b/r/R/record-batch-writer.R
index 4c4d0bb..8b51603 100644
--- a/r/R/record-batch-writer.R
+++ b/r/R/record-batch-writer.R
@@ -39,7 +39,7 @@
#' - `sink` An `OutputStream`
#' - `schema` A [Schema] for the data to be written
#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
-#' versions 0.14 and lower can read it? Default is `FALSE`. You can also
+#' versions 0.14 and lower can read it. Default is `FALSE`. You can also
#' enable this by setting the environment variable
`ARROW_PRE_0_15_IPC_FORMAT=1`.
#' - `metadata_version`: A string like "V5" or the equivalent integer
indicating
#' the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
@@ -130,7 +130,6 @@ RecordBatchStreamWriter$create <- function(sink,
call. = FALSE
)
}
- use_legacy_format <- use_legacy_format %||%
identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1")
assert_is(sink, "OutputStream")
assert_is(schema, "Schema")
@@ -138,7 +137,7 @@ RecordBatchStreamWriter$create <- function(sink,
ipc___RecordBatchStreamWriter__Open(
sink,
schema,
- isTRUE(use_legacy_format),
+ get_ipc_use_legacy_format(use_legacy_format),
get_ipc_metadata_version(metadata_version)
)
)
@@ -160,7 +159,6 @@ RecordBatchFileWriter$create <- function(sink,
call. = FALSE
)
}
- use_legacy_format <- use_legacy_format %||%
identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1")
assert_is(sink, "OutputStream")
assert_is(schema, "Schema")
@@ -168,7 +166,7 @@ RecordBatchFileWriter$create <- function(sink,
ipc___RecordBatchFileWriter__Open(
sink,
schema,
- isTRUE(use_legacy_format),
+ get_ipc_use_legacy_format(use_legacy_format),
get_ipc_metadata_version(metadata_version)
)
)
@@ -196,3 +194,7 @@ get_ipc_metadata_version <- function(x) {
}
out
}
+
+get_ipc_use_legacy_format <- function(x) {
+ isTRUE(x %||% identical(Sys.getenv("ARROW_PRE_0_15_IPC_FORMAT"), "1"))
+}
diff --git a/r/man/RecordBatchWriter.Rd b/r/man/RecordBatchWriter.Rd
index 0422da6..038653b 100644
--- a/r/man/RecordBatchWriter.Rd
+++ b/r/man/RecordBatchWriter.Rd
@@ -23,7 +23,7 @@ factory methods instantiate the object and take the following
arguments:
\item \code{sink} An \code{OutputStream}
\item \code{schema} A \link{Schema} for the data to be written
\item \code{use_legacy_format} logical: write data formatted so that Arrow
libraries
-versions 0.14 and lower can read it? Default is \code{FALSE}. You can also
+versions 0.14 and lower can read it. Default is \code{FALSE}. You can also
enable this by setting the environment variable
\code{ARROW_PRE_0_15_IPC_FORMAT=1}.
\item \code{metadata_version}: A string like "V5" or the equivalent integer
indicating
the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
diff --git a/r/man/write_dataset.Rd b/r/man/write_dataset.Rd
index cb3853a..755c4c9 100644
--- a/r/man/write_dataset.Rd
+++ b/r/man/write_dataset.Rd
@@ -42,7 +42,18 @@ will yield \verb{"part-0.feather", ...}.}
(\code{key1=value1/key2=value2/file.ext}) or as just bare values. Default is
\code{TRUE}.}
\item{...}{additional format-specific arguments. For available Parquet
-options, see \code{\link[=write_parquet]{write_parquet()}}.}
+options, see \code{\link[=write_parquet]{write_parquet()}}. The available
Feather options are
+\itemize{
+\item \code{use_legacy_format} logical: write data formatted so that Arrow
libraries
+versions 0.14 and lower can read it. Default is \code{FALSE}. You can also
+enable this by setting the environment variable
\code{ARROW_PRE_0_15_IPC_FORMAT=1}.
+\item \code{metadata_version}: A string like "V5" or the equivalent integer
indicating
+the Arrow IPC MetadataVersion. Default (NULL) will use the latest version,
+unless the environment variable \code{ARROW_PRE_1_0_METADATA_VERSION=1}, in
+which case it will be V4.
+\item \code{codec}: A \link{Codec} which will be used to compress body buffers
of written
+files. Default (NULL) will not compress body buffers.
+}}
}
\value{
The input \code{dataset}, invisibly
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index d0273a5..cadfc8c 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -1697,6 +1697,43 @@ extern "C" SEXP
_arrow_dataset___ParquetFileWriteOptions__update(SEXP options_se
// dataset.cpp
#if defined(ARROW_R_WITH_ARROW)
+void dataset___IpcFileWriteOptions__update2(const
std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool use_legacy_format,
const std::shared_ptr<arrow::util::Codec>& codec, arrow::ipc::MetadataVersion
metadata_version);
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP
metadata_version_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<ds::IpcFileWriteOptions>&>::type
ipc_options(ipc_options_sexp);
+ arrow::r::Input<bool>::type use_legacy_format(use_legacy_format_sexp);
+ arrow::r::Input<const std::shared_ptr<arrow::util::Codec>&>::type
codec(codec_sexp);
+ arrow::r::Input<arrow::ipc::MetadataVersion>::type
metadata_version(metadata_version_sexp);
+ dataset___IpcFileWriteOptions__update2(ipc_options, use_legacy_format,
codec, metadata_version);
+ return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update2(SEXP
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP codec_sexp, SEXP
metadata_version_sexp){
+ Rf_error("Cannot call dataset___IpcFileWriteOptions__update2(). Please
use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_ARROW)
+void dataset___IpcFileWriteOptions__update1(const
std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool use_legacy_format,
arrow::ipc::MetadataVersion metadata_version);
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){
+BEGIN_CPP11
+ arrow::r::Input<const std::shared_ptr<ds::IpcFileWriteOptions>&>::type
ipc_options(ipc_options_sexp);
+ arrow::r::Input<bool>::type use_legacy_format(use_legacy_format_sexp);
+ arrow::r::Input<arrow::ipc::MetadataVersion>::type
metadata_version(metadata_version_sexp);
+ dataset___IpcFileWriteOptions__update1(ipc_options, use_legacy_format,
metadata_version);
+ return R_NilValue;
+END_CPP11
+}
+#else
+extern "C" SEXP _arrow_dataset___IpcFileWriteOptions__update1(SEXP
ipc_options_sexp, SEXP use_legacy_format_sexp, SEXP metadata_version_sexp){
+ Rf_error("Cannot call dataset___IpcFileWriteOptions__update1(). Please
use arrow::install_arrow() to install required runtime libraries. ");
+}
+#endif
+
+// dataset.cpp
+#if defined(ARROW_R_WITH_ARROW)
std::shared_ptr<ds::IpcFileFormat> dataset___IpcFileFormat__Make();
extern "C" SEXP _arrow_dataset___IpcFileFormat__Make(){
BEGIN_CPP11
@@ -6399,6 +6436,8 @@ static const R_CallMethodDef CallEntries[] = {
{ "_arrow_dataset___ParquetFileFormat__Make", (DL_FUNC)
&_arrow_dataset___ParquetFileFormat__Make, 3},
{ "_arrow_dataset___FileWriteOptions__type_name", (DL_FUNC)
&_arrow_dataset___FileWriteOptions__type_name, 1},
{ "_arrow_dataset___ParquetFileWriteOptions__update", (DL_FUNC)
&_arrow_dataset___ParquetFileWriteOptions__update, 3},
+ { "_arrow_dataset___IpcFileWriteOptions__update2", (DL_FUNC)
&_arrow_dataset___IpcFileWriteOptions__update2, 4},
+ { "_arrow_dataset___IpcFileWriteOptions__update1", (DL_FUNC)
&_arrow_dataset___IpcFileWriteOptions__update1, 3},
{ "_arrow_dataset___IpcFileFormat__Make", (DL_FUNC)
&_arrow_dataset___IpcFileFormat__Make, 0},
{ "_arrow_dataset___CsvFileFormat__Make", (DL_FUNC)
&_arrow_dataset___CsvFileFormat__Make, 1},
{ "_arrow_dataset___DirectoryPartitioning", (DL_FUNC)
&_arrow_dataset___DirectoryPartitioning, 1},
diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp
index f4c3ed1..8a88ab0 100644
--- a/r/src/dataset.cpp
+++ b/r/src/dataset.cpp
@@ -21,6 +21,7 @@
#include <arrow/dataset/api.h>
#include <arrow/filesystem/filesystem.h>
+#include <arrow/ipc/writer.h>
#include <arrow/table.h>
#include <arrow/util/iterator.h>
@@ -208,6 +209,24 @@ void dataset___ParquetFileWriteOptions__update(
}
// [[arrow::export]]
+void dataset___IpcFileWriteOptions__update2(
+ const std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool
use_legacy_format,
+ const std::shared_ptr<arrow::util::Codec>& codec,
+ arrow::ipc::MetadataVersion metadata_version) {
+ ipc_options->options->write_legacy_ipc_format = use_legacy_format;
+ ipc_options->options->codec = codec;
+ ipc_options->options->metadata_version = metadata_version;
+}
+
+// [[arrow::export]]
+void dataset___IpcFileWriteOptions__update1(
+ const std::shared_ptr<ds::IpcFileWriteOptions>& ipc_options, bool
use_legacy_format,
+ arrow::ipc::MetadataVersion metadata_version) {
+ ipc_options->options->write_legacy_ipc_format = use_legacy_format;
+ ipc_options->options->metadata_version = metadata_version;
+}
+
+// [[arrow::export]]
std::shared_ptr<ds::IpcFileFormat> dataset___IpcFileFormat__Make() {
return std::make_shared<ds::IpcFileFormat>();
}
diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R
index 074fcd3..d1904fc 100644
--- a/r/tests/testthat/test-dataset.R
+++ b/r/tests/testthat/test-dataset.R
@@ -177,7 +177,7 @@ test_that("dataset from URI", {
test_that("Simple interface for datasets (custom ParquetFileFormat)", {
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()),
format = FileFormat$create("parquet", dict_columns =
c("chr")))
- expect_equivalent(ds$schema$GetFieldByName("chr")$type, dictionary())
+ expect_type_equal(ds$schema$GetFieldByName("chr")$type, dictionary())
})
test_that("Hive partitioning", {
@@ -943,10 +943,38 @@ test_that("Dataset writing: from RecordBatch", {
)
})
+test_that("Writing a dataset: Ipc format options & compression", {
+ skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
+ ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
+ dst_dir <- make_temp_dir()
+
+ codec <- NULL
+ if (codec_is_available("zstd")) {
+ codec <- Codec$create("zstd")
+ }
+
+ write_dataset(ds, dst_dir, format = "feather", codec = codec)
+ expect_true(dir.exists(dst_dir))
+
+ new_ds <- open_dataset(dst_dir, format = "feather")
+ expect_equivalent(
+ new_ds %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6 & integer < 11) %>%
+ collect() %>%
+ summarize(mean = mean(integer)),
+ df1 %>%
+ select(string = chr, integer = int) %>%
+ filter(integer > 6) %>%
+ summarize(mean = mean(integer))
+ )
+})
+
test_that("Writing a dataset: Parquet format options", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
+ dst_dir_no_truncated_timestamps <- make_temp_dir()
# Use trace() to confirm that options are passed in
trace(
@@ -956,7 +984,7 @@ test_that("Writing a dataset: Parquet format options", {
where = write_dataset
)
expect_warning(
- write_dataset(ds, make_temp_dir(), format = "parquet", partitioning =
"int"),
+ write_dataset(ds, dst_dir_no_truncated_timestamps, format = "parquet",
partitioning = "int"),
"allow_truncated_timestamps == FALSE"
)
expect_warning(
diff --git a/r/tests/testthat/test-record-batch-reader.R
b/r/tests/testthat/test-record-batch-reader.R
index e03664e..d9c3406 100644
--- a/r/tests/testthat/test-record-batch-reader.R
+++ b/r/tests/testthat/test-record-batch-reader.R
@@ -82,7 +82,7 @@ test_that("MetadataFormat", {
Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = 1)
expect_identical(get_ipc_metadata_version(NULL), 3L)
Sys.setenv(ARROW_PRE_0_15_IPC_FORMAT = "")
-
+
expect_identical(get_ipc_metadata_version(NULL), 4L)
Sys.setenv(ARROW_PRE_1_0_METADATA_VERSION = 1)
expect_identical(get_ipc_metadata_version(NULL), 3L)