This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new c6e76809a Split out integration test plumbing (#2594) (#2300) (#2598)
c6e76809a is described below
commit c6e76809a33e2e85df25ff5cefad93dc6a4499b8
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sun Aug 28 10:55:23 2022 +0100
Split out integration test plumbing (#2594) (#2300) (#2598)
* Split out integration test plumbing (#2594) (#2300)
* Fix RAT
---
.github/workflows/arrow.yml | 2 +
arrow/Cargo.toml | 1 -
arrow/src/ipc/reader.rs | 346 +----------------
arrow/src/ipc/writer.rs | 416 ++-------------------
arrow/src/util/mod.rs | 2 -
dev/release/rat_exclude_files.txt | 1 +
integration-testing/Cargo.toml | 9 +-
.../data/integration.json | 0
.../src/bin/arrow-json-integration-test.rs | 3 +-
integration-testing/src/lib.rs | 24 +-
.../src/util.rs | 23 +-
integration-testing/tests/ipc_reader.rs | 293 +++++++++++++++
integration-testing/tests/ipc_writer.rs | 314 ++++++++++++++++
13 files changed, 677 insertions(+), 757 deletions(-)
diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml
index 2af4e1a71..0256307b3 100644
--- a/.github/workflows/arrow.yml
+++ b/.github/workflows/arrow.yml
@@ -64,6 +64,8 @@ jobs:
cargo run --example dynamic_types
cargo run --example read_csv
cargo run --example read_csv_infer_schema
+ - name: Run non-archery based integration-tests
+ run: cargo test -p arrow-integration-testing
# test compilaton features
linux-features:
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 46801ea05..7ef9935ea 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -61,7 +61,6 @@ packed_simd = { version = "0.3", default-features = false,
optional = true, pack
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.6", default-features = false, optional = true }
flatbuffers = { version = "2.1.2", default-features = false, features =
["thiserror"], optional = true }
-hex = { version = "0.4", default-features = false, features = ["std"] }
comfy-table = { version = "6.0", optional = true, default-features = false }
pyo3 = { version = "0.17", default-features = false, optional = true }
lexical-core = { version = "^0.8", default-features = false, features =
["write-integers", "write-floats", "parse-integers", "parse-floats"] }
diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs
index 7ffa9aa59..969c8c43f 100644
--- a/arrow/src/ipc/reader.rs
+++ b/arrow/src/ipc/reader.rs
@@ -1173,336 +1173,8 @@ mod tests {
use std::fs::File;
- use flate2::read::GzDecoder;
-
+ use crate::datatypes;
use crate::datatypes::{ArrowNativeType, Float64Type, Int32Type, Int8Type};
- use crate::{datatypes, util::integration_util::*};
-
- #[test]
- #[cfg(not(feature = "force_validate"))]
- fn read_generated_files_014() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "0.14.1";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- "generated_nested",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- "generated_decimal",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- #[should_panic(expected = "Big Endian is not supported for Decimal!")]
- fn read_decimal_be_file_should_panic() {
- let testdata = crate::util::test_util::arrow_test_data();
- let file = File::open(format!(
-
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file",
- testdata
- ))
- .unwrap();
- FileReader::try_new(file, None).unwrap();
- }
-
- #[test]
- #[should_panic(
- expected = "Last offset 687865856 of Utf8 is larger than values length
41"
- )]
- fn read_dictionary_be_not_implemented() {
- // The offsets are not translated for big-endian files
- // https://github.com/apache/arrow-rs/issues/859
- let testdata = crate::util::test_util::arrow_test_data();
- let file = File::open(format!(
-
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_dictionary.arrow_file",
- testdata
- ))
- .unwrap();
- FileReader::try_new(file, None).unwrap();
- }
-
- #[test]
- fn read_generated_be_files_should_work() {
- // complementary to the previous test
- let testdata = crate::util::test_util::arrow_test_data();
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_map",
- "generated_nested",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
-
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file",
- testdata, path
- ))
- .unwrap();
-
- FileReader::try_new(file, None).unwrap();
- });
- }
-
- #[test]
- fn projection_should_work() {
- // complementary to the previous test
- let testdata = crate::util::test_util::arrow_test_data();
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_map",
- "generated_nested",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- ];
- paths.iter().for_each(|path| {
- // We must use littleendian files here.
- // The offsets are not translated for big-endian files
- // https://github.com/apache/arrow-rs/issues/859
- let file = File::open(format!(
-
"{}/arrow-ipc-stream/integration/1.0.0-littleendian/{}.arrow_file",
- testdata, path
- ))
- .unwrap();
-
- let reader = FileReader::try_new(file, Some(vec![0])).unwrap();
- let datatype_0 = reader.schema().fields()[0].data_type().clone();
- reader.for_each(|batch| {
- let batch = batch.unwrap();
- assert_eq!(batch.columns().len(), 1);
- assert_eq!(datatype_0,
batch.schema().fields()[0].data_type().clone());
- });
- });
- }
-
- #[test]
- #[cfg(not(feature = "force_validate"))]
- fn read_generated_streams_014() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "0.14.1";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- "generated_nested",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- "generated_decimal",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- // the next batch must be empty
- assert!(reader.next().is_none());
- // the stream must indicate that it's finished
- assert!(reader.is_finished());
- });
- }
-
- #[test]
- fn read_generated_files_100() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- // "generated_map_non_canonical",
- "generated_nested",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- fn read_generated_streams_100() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- // "generated_map_non_canonical",
- "generated_nested",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- // the next batch must be empty
- assert!(reader.next().is_none());
- // the stream must indicate that it's finished
- assert!(reader.is_finished());
- });
- }
-
- #[test]
- #[cfg(feature = "ipc_compression")]
- fn read_generated_streams_200() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "2.0.0-compression";
-
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- // the next batch must be empty
- assert!(reader.next().is_none());
- // the stream must indicate that it's finished
- assert!(reader.is_finished());
- });
- }
-
- #[test]
- #[cfg(not(feature = "ipc_compression"))]
- fn read_generated_streams_200_negative() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "2.0.0-compression";
-
- // the test is repetitive, thus we can read all supported files at once
- let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd",
"ZSTD")];
- cases.iter().for_each(|(path, compression_name)| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = StreamReader::try_new(file, None).unwrap();
- let err = reader.next().unwrap().unwrap_err();
- let expected_error = format!(
- "Invalid argument error: compression type {} not supported
because arrow was not compiled with the ipc_compression feature",
- compression_name
- );
- assert_eq!(err.to_string(), expected_error);
- });
- }
-
- #[test]
- #[cfg(feature = "ipc_compression")]
- fn read_generated_files_200() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "2.0.0-compression";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- #[cfg(not(feature = "ipc_compression"))]
- fn read_generated_files_200_negative() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "2.0.0-compression";
- // the test is repetitive, thus we can read all supported files at once
- let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd",
"ZSTD")];
- cases.iter().for_each(|(path, compression_name)| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- let err = reader.next().unwrap().unwrap_err();
- let expected_error = format!(
- "Invalid argument error: compression type {} not supported
because arrow was not compiled with the ipc_compression feature",
- compression_name
- );
- assert_eq!(err.to_string(), expected_error);
- });
- }
fn create_test_projection_schema() -> Schema {
// define field types
@@ -1816,22 +1488,6 @@ mod tests {
check_union_with_builder(UnionBuilder::new_sparse());
}
- /// Read gzipped JSON file
- fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
- let testdata = crate::util::test_util::arrow_test_data();
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.json.gz",
- testdata, version, path
- ))
- .unwrap();
- let mut gz = GzDecoder::new(&file);
- let mut s = String::new();
- gz.read_to_string(&mut s).unwrap();
- // convert to Arrow JSON
- let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
- arrow_json
- }
-
#[test]
fn test_roundtrip_stream_nested_dict() {
let xs = vec!["AA", "BB", "AA", "CC", "BB"];
diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs
index 826e49ca8..63f1520a5 100644
--- a/arrow/src/ipc/writer.rs
+++ b/arrow/src/ipc/writer.rs
@@ -1270,31 +1270,28 @@ mod tests {
use super::*;
use std::fs::File;
- use std::io::Read;
+ use std::io::Seek;
use std::sync::Arc;
- use flate2::read::GzDecoder;
use ipc::MetadataVersion;
use crate::array::*;
use crate::datatypes::Field;
use crate::ipc::reader::*;
- use crate::util::integration_util::*;
#[test]
#[cfg(feature = "ipc_compression")]
fn test_write_empty_record_batch_lz4_compression() {
- let file_name = "arrow_lz4_empty";
let schema = Schema::new(vec![Field::new("field1", DataType::Int32,
true)]);
let values: Vec<Option<i32>> = vec![];
let array = Int32Array::from(values);
let record_batch =
RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)])
.unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+
{
- let file =
- File::create(format!("target/debug/testdata/{}.arrow_file",
file_name))
- .unwrap();
let write_option =
IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
.unwrap()
@@ -1302,15 +1299,14 @@ mod tests {
.unwrap();
let mut writer =
- FileWriter::try_new_with_options(file, &schema,
write_option).unwrap();
+ FileWriter::try_new_with_options(&mut file, &schema,
write_option)
+ .unwrap();
writer.write(&record_batch).unwrap();
writer.finish().unwrap();
}
+ file.rewind().unwrap();
{
// read file
- let file =
- File::open(format!("target/debug/testdata/{}.arrow_file",
file_name))
- .unwrap();
let mut reader = FileReader::try_new(file, None).unwrap();
loop {
match reader.next() {
@@ -1345,9 +1341,9 @@ mod tests {
let record_batch =
RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)])
.unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
{
- let file =
-
File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap();
let write_option =
IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
.unwrap()
@@ -1355,15 +1351,14 @@ mod tests {
.unwrap();
let mut writer =
- FileWriter::try_new_with_options(file, &schema,
write_option).unwrap();
+ FileWriter::try_new_with_options(&mut file, &schema,
write_option)
+ .unwrap();
writer.write(&record_batch).unwrap();
writer.finish().unwrap();
}
+ file.rewind().unwrap();
{
// read file
- let file =
- File::open(format!("target/debug/testdata/{}.arrow_file",
"arrow_lz4"))
- .unwrap();
let mut reader = FileReader::try_new(file, None).unwrap();
loop {
match reader.next() {
@@ -1398,9 +1393,8 @@ mod tests {
let record_batch =
RecordBatch::try_new(Arc::new(schema.clone()),
vec![Arc::new(array)])
.unwrap();
+ let mut file = tempfile::tempfile().unwrap();
{
- let file =
-
File::create("target/debug/testdata/arrow_zstd.arrow_file").unwrap();
let write_option =
IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5)
.unwrap()
@@ -1408,15 +1402,14 @@ mod tests {
.unwrap();
let mut writer =
- FileWriter::try_new_with_options(file, &schema,
write_option).unwrap();
+ FileWriter::try_new_with_options(&mut file, &schema,
write_option)
+ .unwrap();
writer.write(&record_batch).unwrap();
writer.finish().unwrap();
}
+ file.rewind().unwrap();
{
// read file
- let file =
- File::open(format!("target/debug/testdata/{}.arrow_file",
"arrow_zstd"))
- .unwrap();
let mut reader = FileReader::try_new(file, None).unwrap();
loop {
match reader.next() {
@@ -1462,18 +1455,16 @@ mod tests {
vec![Arc::new(array1) as ArrayRef],
)
.unwrap();
+ let mut file = tempfile::tempfile().unwrap();
{
- let file =
File::create("target/debug/testdata/arrow.arrow_file").unwrap();
- let mut writer = FileWriter::try_new(file, &schema).unwrap();
+ let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
+ file.rewind().unwrap();
{
- let file =
- File::open(format!("target/debug/testdata/{}.arrow_file",
"arrow"))
- .unwrap();
let mut reader = FileReader::try_new(file, None).unwrap();
while let Some(Ok(read_batch)) = reader.next() {
read_batch
@@ -1569,352 +1560,6 @@ mod tests {
);
}
- #[test]
- #[cfg(not(feature = "force_validate"))]
- fn read_and_rewrite_generated_files_014() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "0.14.1";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- "generated_nested",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- "generated_decimal",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read and rewrite the file to a temp location
- {
- let file = File::create(format!(
- "target/debug/testdata/{}-{}.arrow_file",
- version, path
- ))
- .unwrap();
- let mut writer = FileWriter::try_new(file,
&reader.schema()).unwrap();
- while let Some(Ok(batch)) = reader.next() {
- writer.write(&batch).unwrap();
- }
- writer.finish().unwrap();
- }
-
- let file = File::open(format!(
- "target/debug/testdata/{}-{}.arrow_file",
- version, path
- ))
- .unwrap();
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- #[cfg(not(feature = "force_validate"))]
- fn read_and_rewrite_generated_streams_014() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "0.14.1";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_interval",
- "generated_datetime",
- "generated_dictionary",
- "generated_map",
- "generated_nested",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- "generated_decimal",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(file, None).unwrap();
-
- // read and rewrite the stream to a temp location
- {
- let file = File::create(format!(
- "target/debug/testdata/{}-{}.stream",
- version, path
- ))
- .unwrap();
- let mut writer = StreamWriter::try_new(file,
&reader.schema()).unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
- writer.finish().unwrap();
- }
-
- let file =
- File::open(format!("target/debug/testdata/{}-{}.stream",
version, path))
- .unwrap();
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- fn read_and_rewrite_generated_files_100() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_custom_metadata",
- "generated_datetime",
- "generated_dictionary_unsigned",
- "generated_dictionary",
- // "generated_duplicate_fieldnames",
- "generated_interval",
- "generated_map",
- "generated_nested",
- // "generated_nested_large_offsets",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_large_offsets",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- // "generated_recursive_nested",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read and rewrite the file to a temp location
- {
- let file = File::create(format!(
- "target/debug/testdata/{}-{}.arrow_file",
- version, path
- ))
- .unwrap();
- // write IPC version 5
- let options =
- IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5).unwrap();
- let mut writer =
- FileWriter::try_new_with_options(file, &reader.schema(),
options)
- .unwrap();
- while let Some(Ok(batch)) = reader.next() {
- writer.write(&batch).unwrap();
- }
- writer.finish().unwrap();
- }
-
- let file = File::open(format!(
- "target/debug/testdata/{}-{}.arrow_file",
- version, path
- ))
- .unwrap();
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- fn read_and_rewrite_generated_streams_100() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "1.0.0-littleendian";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec![
- "generated_custom_metadata",
- "generated_datetime",
- "generated_dictionary_unsigned",
- "generated_dictionary",
- // "generated_duplicate_fieldnames",
- "generated_interval",
- "generated_map",
- "generated_nested",
- // "generated_nested_large_offsets",
- "generated_null_trivial",
- "generated_null",
- "generated_primitive_large_offsets",
- "generated_primitive_no_batches",
- "generated_primitive_zerolength",
- "generated_primitive",
- // "generated_recursive_nested",
- ];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(file, None).unwrap();
-
- // read and rewrite the stream to a temp location
- {
- let file = File::create(format!(
- "target/debug/testdata/{}-{}.stream",
- version, path
- ))
- .unwrap();
- let options =
- IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5).unwrap();
- let mut writer =
- StreamWriter::try_new_with_options(file, &reader.schema(),
options)
- .unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
- writer.finish().unwrap();
- }
-
- let file =
- File::open(format!("target/debug/testdata/{}-{}.stream",
version, path))
- .unwrap();
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- #[cfg(feature = "ipc_compression")]
- fn read_and_rewrite_compression_files_200() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "2.0.0-compression";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
- testdata, version, path
- ))
- .unwrap();
-
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read and rewrite the file to a temp location
- {
- let file = File::create(format!(
- "target/debug/testdata/{}-{}.arrow_file",
- version, path
- ))
- .unwrap();
- // write IPC version 5
- let options =
- IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5)
- .unwrap()
-
.try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
- .unwrap();
-
- let mut writer =
- FileWriter::try_new_with_options(file, &reader.schema(),
options)
- .unwrap();
- while let Some(Ok(batch)) = reader.next() {
- writer.write(&batch).unwrap();
- }
- writer.finish().unwrap();
- }
-
- let file = File::open(format!(
- "target/debug/testdata/{}-{}.arrow_file",
- version, path
- ))
- .unwrap();
- let mut reader = FileReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- #[test]
- #[cfg(feature = "ipc_compression")]
- fn read_and_rewrite_compression_stream_200() {
- let testdata = crate::util::test_util::arrow_test_data();
- let version = "2.0.0-compression";
- // the test is repetitive, thus we can read all supported files at once
- let paths = vec!["generated_lz4", "generated_zstd"];
- paths.iter().for_each(|path| {
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.stream",
- testdata, version, path
- ))
- .unwrap();
-
- let reader = StreamReader::try_new(file, None).unwrap();
-
- // read and rewrite the stream to a temp location
- {
- let file = File::create(format!(
- "target/debug/testdata/{}-{}.stream",
- version, path
- ))
- .unwrap();
- let options =
- IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5)
- .unwrap()
- .try_with_compression(Some(ipc::CompressionType::ZSTD))
- .unwrap();
-
- let mut writer =
- StreamWriter::try_new_with_options(file, &reader.schema(),
options)
- .unwrap();
- reader.for_each(|batch| {
- writer.write(&batch.unwrap()).unwrap();
- });
- writer.finish().unwrap();
- }
-
- let file =
- File::open(format!("target/debug/testdata/{}-{}.stream",
version, path))
- .unwrap();
- let mut reader = StreamReader::try_new(file, None).unwrap();
-
- // read expected JSON output
- let arrow_json = read_gzip_json(version, path);
- assert!(arrow_json.equals_reader(&mut reader).unwrap());
- });
- }
-
- /// Read gzipped JSON file
- fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
- let testdata = crate::util::test_util::arrow_test_data();
- let file = File::open(format!(
- "{}/arrow-ipc-stream/integration/{}/{}.json.gz",
- testdata, version, path
- ))
- .unwrap();
- let mut gz = GzDecoder::new(&file);
- let mut s = String::new();
- gz.read_to_string(&mut s).unwrap();
- // convert to Arrow JSON
- let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
- arrow_json
- }
-
#[test]
fn track_union_nested_dict() {
let inner: DictionaryArray<Int32Type> = vec!["a", "b",
"a"].into_iter().collect();
@@ -1982,7 +1627,6 @@ mod tests {
#[test]
fn read_union_017() {
let testdata = crate::util::test_util::arrow_test_data();
- let version = "0.17.1";
let data_file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream",
testdata,
@@ -1991,26 +1635,18 @@ mod tests {
let reader = StreamReader::try_new(data_file, None).unwrap();
+ let mut file = tempfile::tempfile().unwrap();
// read and rewrite the stream to a temp location
{
- let file = File::create(format!(
- "target/debug/testdata/{}-generated_union.stream",
- version
- ))
- .unwrap();
- let mut writer = StreamWriter::try_new(file,
&reader.schema()).unwrap();
+ let mut writer = StreamWriter::try_new(&mut file,
&reader.schema()).unwrap();
reader.for_each(|batch| {
writer.write(&batch.unwrap()).unwrap();
});
writer.finish().unwrap();
}
+ file.rewind().unwrap();
// Compare original file and rewrote file
- let file = File::open(format!(
- "target/debug/testdata/{}-generated_union.stream",
- version
- ))
- .unwrap();
let rewrite_reader = StreamReader::try_new(file, None).unwrap();
let data_file = File::open(format!(
@@ -2053,18 +1689,18 @@ mod tests {
vec![Arc::new(union) as ArrayRef],
)
.unwrap();
- let file_name = "target/debug/testdata/union.arrow_file";
+
+ let mut file = tempfile::tempfile().unwrap();
{
- let file = File::create(&file_name).unwrap();
let mut writer =
- FileWriter::try_new_with_options(file, &schema,
options).unwrap();
+ FileWriter::try_new_with_options(&mut file, &schema,
options).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
+ file.rewind().unwrap();
{
- let file = File::open(&file_name).unwrap();
let reader = FileReader::try_new(file, None).unwrap();
reader.for_each(|maybe_batch| {
maybe_batch
diff --git a/arrow/src/util/mod.rs b/arrow/src/util/mod.rs
index 1ee05d8a0..6f68398e7 100644
--- a/arrow/src/util/mod.rs
+++ b/arrow/src/util/mod.rs
@@ -24,8 +24,6 @@ pub mod bit_util;
#[cfg(feature = "test_utils")]
pub mod data_gen;
pub mod display;
-#[cfg(any(test, feature = "test_utils"))]
-pub mod integration_util;
#[cfg(feature = "prettyprint")]
pub mod pretty;
pub(crate) mod serialization;
diff --git a/dev/release/rat_exclude_files.txt
b/dev/release/rat_exclude_files.txt
index 466f6fa45..609a5851c 100644
--- a/dev/release/rat_exclude_files.txt
+++ b/dev/release/rat_exclude_files.txt
@@ -4,6 +4,7 @@ target/*
dev/release/rat_exclude_files.txt
arrow/test/data/*
arrow/test/dependency/*
+integration-testing/data/*
parquet_derive/test/dependency/*
.gitattributes
**.gitignore
diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml
index 786f77bd2..74a1ee6aa 100644
--- a/integration-testing/Cargo.toml
+++ b/integration-testing/Cargo.toml
@@ -36,12 +36,15 @@ arrow-flight = { path = "../arrow-flight", default-features
= false }
async-trait = { version = "0.1.41", default-features = false }
clap = { version = "3", default-features = false, features = ["std", "derive"]
}
futures = { version = "0.3", default-features = false }
-hex = { version = "0.4", default-features = false }
+hex = { version = "0.4", default-features = false, features = ["std"] }
prost = { version = "0.11", default-features = false }
-serde = { version = "1.0", default-features = false, features = ["rc"] }
-serde_derive = { version = "1.0", default-features = false }
+serde = { version = "1.0", default-features = false, features = ["rc",
"derive"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
tokio = { version = "1.0", default-features = false }
tonic = { version = "0.8", default-features = false }
tracing-subscriber = { version = "0.3.1", default-features = false, features =
["fmt"], optional = true }
num = { version = "0.4", default-features = false, features = ["std"] }
+flate2 = { version = "1", default-features = false, features =
["rust_backend"] }
+
+[dev-dependencies]
+tempfile = { version = "3", default-features = false }
diff --git a/arrow/test/data/integration.json
b/integration-testing/data/integration.json
similarity index 100%
rename from arrow/test/data/integration.json
rename to integration-testing/data/integration.json
diff --git a/integration-testing/src/bin/arrow-json-integration-test.rs
b/integration-testing/src/bin/arrow-json-integration-test.rs
index b442e8b5e..a7d7cf6ee 100644
--- a/integration-testing/src/bin/arrow-json-integration-test.rs
+++ b/integration-testing/src/bin/arrow-json-integration-test.rs
@@ -20,8 +20,7 @@ use arrow::datatypes::{DataType, Field};
use arrow::error::{ArrowError, Result};
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
-use arrow::util::integration_util::*;
-use arrow_integration_testing::read_json_file;
+use arrow_integration_testing::{read_json_file, util::*};
use clap::Parser;
use std::fs::File;
diff --git a/integration-testing/src/lib.rs b/integration-testing/src/lib.rs
index 5d3da15d3..ffe112af7 100644
--- a/integration-testing/src/lib.rs
+++ b/integration-testing/src/lib.rs
@@ -19,12 +19,12 @@
use serde_json::Value;
-use arrow::util::integration_util::ArrowJsonBatch;
+use util::*;
use arrow::datatypes::Schema;
use arrow::error::Result;
use arrow::record_batch::RecordBatch;
-use arrow::util::integration_util::*;
+use arrow::util::test_util::arrow_test_data;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
@@ -36,6 +36,7 @@ pub const AUTH_PASSWORD: &str = "flight";
pub mod flight_client_scenarios;
pub mod flight_server_scenarios;
+pub mod util;
pub struct ArrowFile {
pub schema: Schema,
@@ -76,3 +77,22 @@ pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
batches,
})
}
+
+/// Read gzipped JSON test file
+pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
+ use flate2::read::GzDecoder;
+ use std::io::Read;
+
+ let testdata = arrow_test_data();
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.json.gz",
+ testdata, version, path
+ ))
+ .unwrap();
+ let mut gz = GzDecoder::new(&file);
+ let mut s = String::new();
+ gz.read_to_string(&mut s).unwrap();
+ // convert to Arrow JSON
+ let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
+ arrow_json
+}
diff --git a/arrow/src/util/integration_util.rs
b/integration-testing/src/util.rs
similarity index 99%
rename from arrow/src/util/integration_util.rs
rename to integration-testing/src/util.rs
index c100a137c..e098c4e14 100644
--- a/arrow/src/util/integration_util.rs
+++ b/integration-testing/src/util.rs
@@ -22,19 +22,19 @@
use hex::decode;
use num::BigInt;
use num::Signed;
-use serde_derive::{Deserialize, Serialize};
+use serde::{Deserialize, Serialize};
use serde_json::{Map as SJMap, Value};
use std::collections::HashMap;
use std::sync::Arc;
-use crate::array::*;
-use crate::buffer::{Buffer, MutableBuffer};
-use crate::compute;
-use crate::datatypes::*;
-use crate::error::{ArrowError, Result};
-use crate::record_batch::{RecordBatch, RecordBatchReader};
-use crate::util::bit_util;
-use crate::util::decimal::Decimal256;
+use arrow::array::*;
+use arrow::buffer::{Buffer, MutableBuffer};
+use arrow::compute;
+use arrow::datatypes::*;
+use arrow::error::{ArrowError, Result};
+use arrow::record_batch::{RecordBatch, RecordBatchReader};
+use arrow::util::bit_util;
+use arrow::util::decimal::Decimal256;
/// A struct that represents an Arrow file with a schema and record batches
#[derive(Deserialize, Serialize, Debug)]
@@ -1047,7 +1047,7 @@ mod tests {
use std::io::Read;
use std::sync::Arc;
- use crate::buffer::Buffer;
+ use arrow::buffer::Buffer;
#[test]
fn test_schema_equality() {
@@ -1112,7 +1112,6 @@ mod tests {
}
#[test]
- #[cfg_attr(miri, ignore)] // running forever
fn test_arrow_data_equality() {
let secs_tz = Some("Europe/Budapest".to_string());
let millis_tz = Some("America/New_York".to_string());
@@ -1333,7 +1332,7 @@ mod tests {
],
)
.unwrap();
- let mut file = File::open("test/data/integration.json").unwrap();
+ let mut file = File::open("data/integration.json").unwrap();
let mut json = String::new();
file.read_to_string(&mut json).unwrap();
let arrow_json: ArrowJson = serde_json::from_str(&json).unwrap();
diff --git a/integration-testing/tests/ipc_reader.rs
b/integration-testing/tests/ipc_reader.rs
new file mode 100644
index 000000000..778d1ee77
--- /dev/null
+++ b/integration-testing/tests/ipc_reader.rs
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::ipc::reader::{FileReader, StreamReader};
+use arrow::util::test_util::arrow_test_data;
+use arrow_integration_testing::read_gzip_json;
+use std::fs::File;
+
+#[test]
+fn read_generated_files_014() {
+ let testdata = arrow_test_data();
+ let version = "0.14.1";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_dictionary",
+ "generated_map",
+ "generated_nested",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ "generated_decimal",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
+
+#[test]
+#[should_panic(expected = "Big Endian is not supported for Decimal!")]
+fn read_decimal_be_file_should_panic() {
+ let testdata = arrow_test_data();
+ let file = File::open(format!(
+
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file",
+ testdata
+ ))
+ .unwrap();
+ FileReader::try_new(file, None).unwrap();
+}
+
+#[test]
+#[should_panic(
+ expected = "Last offset 687865856 of Utf8 is larger than values length 41"
+)]
+fn read_dictionary_be_not_implemented() {
+ // The offsets are not translated for big-endian files
+ // https://github.com/apache/arrow-rs/issues/859
+ let testdata = arrow_test_data();
+ let file = File::open(format!(
+
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_dictionary.arrow_file",
+ testdata
+ ))
+ .unwrap();
+ FileReader::try_new(file, None).unwrap();
+}
+
+#[test]
+fn read_generated_be_files_should_work() {
+ // complementary to the previous test
+ let testdata = arrow_test_data();
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_map",
+ "generated_nested",
+ "generated_null_trivial",
+ "generated_null",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file",
+ testdata, path
+ ))
+ .unwrap();
+
+ FileReader::try_new(file, None).unwrap();
+ });
+}
+
+#[test]
+fn projection_should_work() {
+ // complementary to the previous test
+ let testdata = arrow_test_data();
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_map",
+ "generated_nested",
+ "generated_null_trivial",
+ "generated_null",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ ];
+ paths.iter().for_each(|path| {
+ // We must use littleendian files here.
+ // The offsets are not translated for big-endian files
+ // https://github.com/apache/arrow-rs/issues/859
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/1.0.0-littleendian/{}.arrow_file",
+ testdata, path
+ ))
+ .unwrap();
+
+ let reader = FileReader::try_new(file, Some(vec![0])).unwrap();
+ let datatype_0 = reader.schema().fields()[0].data_type().clone();
+ reader.for_each(|batch| {
+ let batch = batch.unwrap();
+ assert_eq!(batch.columns().len(), 1);
+ assert_eq!(datatype_0,
batch.schema().fields()[0].data_type().clone());
+ });
+ });
+}
+
+#[test]
+fn read_generated_streams_014() {
+ let testdata = arrow_test_data();
+ let version = "0.14.1";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_dictionary",
+ "generated_map",
+ "generated_nested",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ "generated_decimal",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ // the next batch must be empty
+ assert!(reader.next().is_none());
+ // the stream must indicate that it's finished
+ assert!(reader.is_finished());
+ });
+}
+
+#[test]
+fn read_generated_files_100() {
+ let testdata = arrow_test_data();
+ let version = "1.0.0-littleendian";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_dictionary",
+ "generated_map",
+ // "generated_map_non_canonical",
+ "generated_nested",
+ "generated_null_trivial",
+ "generated_null",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
+
+#[test]
+fn read_generated_streams_100() {
+ let testdata = arrow_test_data();
+ let version = "1.0.0-littleendian";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_dictionary",
+ "generated_map",
+ // "generated_map_non_canonical",
+ "generated_nested",
+ "generated_null_trivial",
+ "generated_null",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ // the next batch must be empty
+ assert!(reader.next().is_none());
+ // the stream must indicate that it's finished
+ assert!(reader.is_finished());
+ });
+}
+
+#[test]
+fn read_generated_streams_200() {
+ let testdata = arrow_test_data();
+ let version = "2.0.0-compression";
+
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ // the next batch must be empty
+ assert!(reader.next().is_none());
+ // the stream must indicate that it's finished
+ assert!(reader.is_finished());
+ });
+}
+
+#[test]
+fn read_generated_files_200() {
+ let testdata = arrow_test_data();
+ let version = "2.0.0-compression";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
diff --git a/integration-testing/tests/ipc_writer.rs
b/integration-testing/tests/ipc_writer.rs
new file mode 100644
index 000000000..0aa17cd05
--- /dev/null
+++ b/integration-testing/tests/ipc_writer.rs
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::ipc;
+use arrow::ipc::reader::{FileReader, StreamReader};
+use arrow::ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
+use arrow::util::test_util::arrow_test_data;
+use arrow_integration_testing::read_gzip_json;
+use std::fs::File;
+use std::io::Seek;
+
+#[test]
+fn read_and_rewrite_generated_files_014() {
+ let testdata = arrow_test_data();
+ let version = "0.14.1";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_dictionary",
+ "generated_map",
+ "generated_nested",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ "generated_decimal",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+
+ // read and rewrite the file to a temp location
+ {
+ let mut writer = FileWriter::try_new(&mut file,
&reader.schema()).unwrap();
+ while let Some(Ok(batch)) = reader.next() {
+ writer.write(&batch).unwrap();
+ }
+ writer.finish().unwrap();
+ }
+ file.rewind().unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
+
+#[test]
+fn read_and_rewrite_generated_streams_014() {
+ let testdata = arrow_test_data();
+ let version = "0.14.1";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_interval",
+ "generated_datetime",
+ "generated_dictionary",
+ "generated_map",
+ "generated_nested",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ "generated_decimal",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let reader = StreamReader::try_new(file, None).unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+
+ // read and rewrite the stream to a temp location
+ {
+ let mut writer = StreamWriter::try_new(&mut file,
&reader.schema()).unwrap();
+ reader.for_each(|batch| {
+ writer.write(&batch.unwrap()).unwrap();
+ });
+ writer.finish().unwrap();
+ }
+
+ file.rewind().unwrap();
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
+
+#[test]
+fn read_and_rewrite_generated_files_100() {
+ let testdata = arrow_test_data();
+ let version = "1.0.0-littleendian";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_custom_metadata",
+ "generated_datetime",
+ "generated_dictionary_unsigned",
+ "generated_dictionary",
+ // "generated_duplicate_fieldnames",
+ "generated_interval",
+ "generated_map",
+ "generated_nested",
+ // "generated_nested_large_offsets",
+ "generated_null_trivial",
+ "generated_null",
+ "generated_primitive_large_offsets",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ // "generated_recursive_nested",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+
+ // read and rewrite the file to a temp location
+ {
+ // write IPC version 5
+ let options =
+ IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5).unwrap();
+ let mut writer =
+ FileWriter::try_new_with_options(&mut file, &reader.schema(),
options)
+ .unwrap();
+ while let Some(Ok(batch)) = reader.next() {
+ writer.write(&batch).unwrap();
+ }
+ writer.finish().unwrap();
+ }
+
+ file.rewind().unwrap();
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
+
+#[test]
+fn read_and_rewrite_generated_streams_100() {
+ let testdata = arrow_test_data();
+ let version = "1.0.0-littleendian";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec![
+ "generated_custom_metadata",
+ "generated_datetime",
+ "generated_dictionary_unsigned",
+ "generated_dictionary",
+ // "generated_duplicate_fieldnames",
+ "generated_interval",
+ "generated_map",
+ "generated_nested",
+ // "generated_nested_large_offsets",
+ "generated_null_trivial",
+ "generated_null",
+ "generated_primitive_large_offsets",
+ "generated_primitive_no_batches",
+ "generated_primitive_zerolength",
+ "generated_primitive",
+ // "generated_recursive_nested",
+ ];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let reader = StreamReader::try_new(file, None).unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+
+ // read and rewrite the stream to a temp location
+ {
+ let options =
+ IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5).unwrap();
+ let mut writer =
+ StreamWriter::try_new_with_options(&mut file,
&reader.schema(), options)
+ .unwrap();
+ reader.for_each(|batch| {
+ writer.write(&batch.unwrap()).unwrap();
+ });
+ writer.finish().unwrap();
+ }
+
+ file.rewind().unwrap();
+
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
+
+#[test]
+fn read_and_rewrite_compression_files_200() {
+ let testdata = arrow_test_data();
+ let version = "2.0.0-compression";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+
+ // read and rewrite the file to a temp location
+ {
+ // write IPC version 5
+ let options = IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME))
+ .unwrap();
+
+ let mut writer =
+ FileWriter::try_new_with_options(&mut file, &reader.schema(),
options)
+ .unwrap();
+ while let Some(Ok(batch)) = reader.next() {
+ writer.write(&batch).unwrap();
+ }
+ writer.finish().unwrap();
+ }
+
+ file.rewind().unwrap();
+ let mut reader = FileReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}
+
+#[test]
+fn read_and_rewrite_compression_stream_200() {
+ let testdata = arrow_test_data();
+ let version = "2.0.0-compression";
+ // the test is repetitive, thus we can read all supported files at once
+ let paths = vec!["generated_lz4", "generated_zstd"];
+ paths.iter().for_each(|path| {
+ let file = File::open(format!(
+ "{}/arrow-ipc-stream/integration/{}/{}.stream",
+ testdata, version, path
+ ))
+ .unwrap();
+
+ let reader = StreamReader::try_new(file, None).unwrap();
+
+ let mut file = tempfile::tempfile().unwrap();
+
+ // read and rewrite the stream to a temp location
+ {
+ let options = IpcWriteOptions::try_new(8, false,
ipc::MetadataVersion::V5)
+ .unwrap()
+ .try_with_compression(Some(ipc::CompressionType::ZSTD))
+ .unwrap();
+
+ let mut writer =
+ StreamWriter::try_new_with_options(&mut file,
&reader.schema(), options)
+ .unwrap();
+ reader.for_each(|batch| {
+ writer.write(&batch.unwrap()).unwrap();
+ });
+ writer.finish().unwrap();
+ }
+
+ file.rewind().unwrap();
+
+ let mut reader = StreamReader::try_new(file, None).unwrap();
+
+ // read expected JSON output
+ let arrow_json = read_gzip_json(version, path);
+ assert!(arrow_json.equals_reader(&mut reader).unwrap());
+ });
+}