This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new e0b3c9a ARROW-10297: [Rust] Parameter for parquet-read to output data
in json format, add "cli" feature to parquet crate
e0b3c9a is described below
commit e0b3c9af5f33df948af01e458542a25e73b24b1e
Author: Manoj Karthick <[email protected]>
AuthorDate: Sun Jan 31 12:18:08 2021 -0800
ARROW-10297: [Rust] Parameter for parquet-read to output data in json
format, add "cli" feature to parquet crate
Add an option to print output in JSON format. in the parquet-read binary.
Having json output allows for easy analysis using tools like
[jq](https://stedolan.github.io/jq/). This PR builds on the changes implemented
in https://github.com/apache/arrow/pull/8686 and incorporates the suggestions
in that PR.
**Changelog**
* Update all three binaries `parquet-schema`, `parquet-rowcount` and
`parquet-read` to use [clap](https://github.com/clap-rs/clap) for argument
parsing
* Add `to_json_value()` method to get `serde_json::Value` from `Row` and
`Field` structs (Thanks to @jhorstmann for these changes!)
* parquet-schema:
* Convert verbose argument into `-v/--verbose` flag
* parquet-read:
* Add a new flag `-j/--json` that prints the file contents in json lines
format
* The feature is gated under the `json_output` cargo feature
* Update documentation and README with instructions for running
* The binaries now use version and author information as defined in
Cargo.toml
Example output:
```
❯ parquet-read cities.parquet 3 --json
{"continent":"Europe","country":{"name":"France","city":["Paris","Nice","Marseilles","Cannes"]}}
{"continent":"Europe","country":{"name":"Greece","city":["Athens","Piraeus","Hania","Heraklion","Rethymnon","Fira"]}}
{"continent":"North
America","country":{"name":"Canada","city":["Toronto","Vancouver","St.
John's","Saint
John","Montreal","Halifax","Winnipeg","Calgary","Saskatoon","Ottawa","Yellowknife"]}}
```
Closes #9306 from manojkarthick/rust-parquet-bin-clap
Authored-by: Manoj Karthick <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
rust/parquet/Cargo.toml | 15 +++
rust/parquet/README.md | 15 ++-
rust/parquet/src/bin/parquet-read.rs | 75 ++++++++++---
rust/parquet/src/bin/parquet-rowcount.rs | 36 ++++--
rust/parquet/src/bin/parquet-schema.rs | 62 +++++++----
rust/parquet/src/record/api.rs | 183 ++++++++++++++++++++++++++++++-
6 files changed, 322 insertions(+), 64 deletions(-)
diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml
index 11c35a7..f7be4ee 100644
--- a/rust/parquet/Cargo.toml
+++ b/rust/parquet/Cargo.toml
@@ -43,6 +43,8 @@ chrono = "0.4"
num-bigint = "0.3"
arrow = { path = "../arrow", version = "4.0.0-SNAPSHOT", optional = true }
base64 = { version = "0.12", optional = true }
+clap = { version = "2.33.3", optional = true }
+serde_json = { version = "1.0", features = ["preserve_order"], optional = true
}
[dev-dependencies]
rand = "0.8"
@@ -56,3 +58,16 @@ serde_json = { version = "1.0", features =
["preserve_order"] }
[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
+cli = ["serde_json", "base64", "clap"]
+
+[[ bin ]]
+name = "parquet-read"
+required-features = ["cli"]
+
+[[ bin ]]
+name = "parquet-schema"
+required-features = ["cli"]
+
+[[ bin ]]
+name = "parquet-rowcount"
+required-features = ["cli"]
diff --git a/rust/parquet/README.md b/rust/parquet/README.md
index cac8ac4..252312a 100644
--- a/rust/parquet/README.md
+++ b/rust/parquet/README.md
@@ -79,23 +79,22 @@ enabled by adding `RUSTFLAGS="-C target-feature=+sse4.2"`
before the
`cargo build` command.
## Test
-Run `cargo test` for unit tests.
+Run `cargo test` for unit tests. To also run tests related to the binaries,
use `cargo test --features cli`.
## Binaries
-The following binaries are provided (use `cargo install` to install them):
+The following binaries are provided (use `cargo install --features cli` to
install them):
- **parquet-schema** for printing Parquet file schema and metadata.
-`Usage: parquet-schema <file-path> [verbose]`, where `file-path` is the path
to a Parquet file,
-and optional `verbose` is the boolean flag that allows to print full metadata
or schema only
-(when not specified only schema will be printed).
+`Usage: parquet-schema <file-path>`, where `file-path` is the path to a
Parquet file. Use `-v/--verbose` flag
+to print full metadata or schema only (when not specified only schema will be
printed).
- **parquet-read** for reading records from a Parquet file.
`Usage: parquet-read <file-path> [num-records]`, where `file-path` is the path
to a Parquet file,
and `num-records` is the number of records to read from a file (when not
specified all records will
-be printed).
+be printed). Use `-j/--json` to print records in JSON lines format.
- **parquet-rowcount** for reporting the number of records in one or more
Parquet files.
-`Usage: parquet-rowcount <file-path> ...`, where `file-path` is the path to a
Parquet file, and `...`
-indicates any number of additional parquet files.
+`Usage: parquet-rowcount <file-paths>...`, where `<file-paths>...` is a space
separated list of one or more
+files to read.
If you see `Library not loaded` error, please make sure `LD_LIBRARY_PATH` is
set properly:
```
diff --git a/rust/parquet/src/bin/parquet-read.rs
b/rust/parquet/src/bin/parquet-read.rs
index c86b26e..aa3b827 100644
--- a/rust/parquet/src/bin/parquet-read.rs
+++ b/rust/parquet/src/bin/parquet-read.rs
@@ -34,39 +34,72 @@
//! ```
//!
//! # Usage
-//!
//! ```
//! parquet-read <file-path> [num-records]
//! ```
-//! where `file-path` is the path to a Parquet file and `num-records` is the
optional
-//! numeric option that allows to specify number of records to read from a
file.
-//! When not provided, all records are read.
+//!
+//! ## Flags
+//! -h, --help Prints help information
+//! -j, --json Print Parquet file in JSON lines Format
+//! -V, --version Prints version information
+//!
+//! ## Args
+//! <file-path> Path to a Parquet file
+//! <num-records> Number of records to read. When not provided, all
records are read.
//!
//! Note that `parquet-read` reads full file schema, no projection or
filtering is
//! applied.
extern crate parquet;
-use std::{env, fs::File, path::Path, process};
+use std::{env, fs::File, path::Path};
+
+use clap::{crate_authors, crate_version, App, Arg};
use parquet::file::reader::{FileReader, SerializedFileReader};
+use parquet::record::Row;
fn main() {
- let args: Vec<String> = env::args().collect();
- if args.len() != 2 && args.len() != 3 {
- println!("Usage: parquet-read <file-path> [num-records]");
- process::exit(1);
- }
+ let app = App::new("parquet-read")
+ .version(crate_version!())
+ .author(crate_authors!())
+ .about("Read data from a Parquet file and print output in console, in
either built-in or JSON format")
+ .arg(
+ Arg::with_name("file_path")
+ .value_name("file-path")
+ .required(true)
+ .index(1)
+ .help("Path to a parquet file"),
+ )
+ .arg(
+ Arg::with_name("num_records")
+ .value_name("num-records")
+ .index(2)
+ .help(
+ "Number of records to read. When not provided, all records
are read.",
+ ),
+ )
+ .arg(
+ Arg::with_name("json")
+ .short("j")
+ .long("json")
+ .takes_value(false)
+ .help("Print Parquet file in JSON lines format"),
+ );
- let mut num_records: Option<usize> = None;
- if args.len() == 3 {
- match args[2].parse() {
- Ok(value) => num_records = Some(value),
+ let matches = app.get_matches();
+ let filename = matches.value_of("file_path").unwrap();
+ let num_records: Option<usize> = if matches.is_present("num_records") {
+ match matches.value_of("num_records").unwrap().parse() {
+ Ok(value) => Some(value),
Err(e) => panic!("Error when reading value for [num-records], {}",
e),
}
- }
+ } else {
+ None
+ };
- let path = Path::new(&args[1]);
+ let json = matches.is_present("json");
+ let path = Path::new(&filename);
let file = File::open(&path).unwrap();
let parquet_reader = SerializedFileReader::new(file).unwrap();
@@ -79,9 +112,17 @@ fn main() {
while all_records || start < end {
match iter.next() {
- Some(row) => println!("{}", row),
+ Some(row) => print_row(&row, json),
None => break,
}
start += 1;
}
}
+
+fn print_row(row: &Row, json: bool) {
+ if json {
+ println!("{}", row.to_json_value())
+ } else {
+ println!("{}", row.to_string());
+ }
+}
diff --git a/rust/parquet/src/bin/parquet-rowcount.rs
b/rust/parquet/src/bin/parquet-rowcount.rs
index 23dc503..3c61bab 100644
--- a/rust/parquet/src/bin/parquet-rowcount.rs
+++ b/rust/parquet/src/bin/parquet-rowcount.rs
@@ -34,30 +34,44 @@
//! ```
//!
//! # Usage
-//!
//! ```
-//! parquet-rowcount <file-path> ...
+//! parquet-rowcount <file-paths>...
//! ```
-//! where `file-path` is the path to a Parquet file and `...` is any
additional number of
-//! parquet files to count the number of rows from.
+//!
+//! ## Flags
+//! -h, --help Prints help information
+//! -V, --version Prints version information
+//!
+//! ## Args
+//! <file-paths>... List of Parquet files to read from
//!
//! Note that `parquet-rowcount` reads full file schema, no projection or
filtering is
//! applied.
extern crate parquet;
-use std::{env, fs::File, path::Path, process};
+use std::{env, fs::File, path::Path};
+
+use clap::{crate_authors, crate_version, App, Arg};
use parquet::file::reader::{FileReader, SerializedFileReader};
fn main() {
- let args: Vec<String> = env::args().collect();
- if args.len() < 2 {
- println!("Usage: parquet-rowcount <file-path> ...");
- process::exit(1);
- }
+ let matches = App::new("parquet-rowcount")
+ .version(crate_version!())
+ .author(crate_authors!())
+ .about("Return number of rows in Parquet file")
+ .arg(
+ Arg::with_name("file_paths")
+ .value_name("file-paths")
+ .required(true)
+ .multiple(true)
+ .help("List of Parquet files to read from separated by space"),
+ )
+ .get_matches();
- for filename in &args[1..] {
+ let filenames: Vec<&str> =
matches.values_of("file_paths").unwrap().collect();
+ for filename in &filenames {
let path = Path::new(filename);
let file = File::open(path).unwrap();
let parquet_reader = SerializedFileReader::new(file).unwrap();
diff --git a/rust/parquet/src/bin/parquet-schema.rs
b/rust/parquet/src/bin/parquet-schema.rs
index cff6d0c..1b80637 100644
--- a/rust/parquet/src/bin/parquet-schema.rs
+++ b/rust/parquet/src/bin/parquet-schema.rs
@@ -34,17 +34,26 @@
//! ```
//!
//! # Usage
-//!
//! ```
-//! parquet-schema <file-path> [verbose]
+//! parquet-schema [FLAGS] <file-path>
//! ```
-//! where `file-path` is the path to a Parquet file and `verbose` is the
optional boolean
-//! flag that allows to print schema only, when set to `false` (default
behaviour when
-//! not provided), or print full file metadata, when set to `true`.
+//!
+//! ## Flags
+//! -h, --help Prints help information
+//! -V, --version Prints version information
+//! -v, --verbose Enable printing full file metadata
+//!
+//! ## Args
+//! <file-path> Path to a Parquet file
+//!
+//! Note that `verbose` is an optional boolean flag that allows to print
schema only,
+//! when not provided or print full file metadata when provided.
extern crate parquet;
-use std::{env, fs::File, path::Path, process};
+use std::{env, fs::File, path::Path};
+
+use clap::{crate_authors, crate_version, App, Arg};
use parquet::{
file::reader::{FileReader, SerializedFileReader},
@@ -52,31 +61,38 @@ use parquet::{
};
fn main() {
- let args: Vec<String> = env::args().collect();
- if args.len() != 2 && args.len() != 3 {
- println!("Usage: parquet-schema <file-path> [verbose]");
- process::exit(1);
- }
- let path = Path::new(&args[1]);
- let mut verbose = false;
- if args.len() == 3 {
- match args[2].parse() {
- Ok(b) => verbose = b,
- Err(e) => panic!(
- "Error when reading value for [verbose] (expected either
'true' or 'false'): {}",
- e
- ),
- }
- }
+ let matches = App::new("parquet-schema")
+ .version(crate_version!())
+ .author(crate_authors!())
+ .arg(
+ Arg::with_name("file_path")
+ .value_name("file-path")
+ .required(true)
+ .index(1)
+ .help("Path to a Parquet file"),
+ )
+ .arg(
+ Arg::with_name("verbose")
+ .short("v")
+ .long("verbose")
+ .takes_value(false)
+ .help("Enable printing full file metadata"),
+ )
+ .get_matches();
+
+ let filename = matches.value_of("file_path").unwrap();
+ let path = Path::new(&filename);
let file = match File::open(&path) {
Err(e) => panic!("Error when opening file {}: {}", path.display(), e),
Ok(f) => f,
};
+ let verbose = matches.is_present("verbose");
+
match SerializedFileReader::new(file) {
Err(e) => panic!("Error when parsing Parquet file: {}", e),
Ok(parquet_reader) => {
let metadata = parquet_reader.metadata();
- println!("Metadata for file: {}", &args[1]);
+ println!("Metadata for file: {}", &filename);
println!();
if verbose {
print_parquet_metadata(&mut std::io::stdout(), &metadata);
diff --git a/rust/parquet/src/record/api.rs b/rust/parquet/src/record/api.rs
index 61a5280..9e131b4 100644
--- a/rust/parquet/src/record/api.rs
+++ b/rust/parquet/src/record/api.rs
@@ -19,7 +19,7 @@
use std::fmt;
-use chrono::{Local, TimeZone};
+use chrono::{TimeZone, Utc};
use num_bigint::{BigInt, Sign};
use crate::basic::{LogicalType, Type as PhysicalType};
@@ -27,6 +27,9 @@ use crate::data_type::{ByteArray, Decimal, Int96};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
+#[cfg(feature = "cli")]
+use serde_json::Value;
+
/// Macro as a shortcut to generate 'not yet implemented' panic error.
macro_rules! nyi {
($column_descr:ident, $value:ident) => {{
@@ -75,6 +78,16 @@ impl Row {
count: self.fields.len(),
}
}
+
+ #[cfg(feature = "cli")]
+ pub fn to_json_value(&self) -> Value {
+ Value::Object(
+ self.fields
+ .iter()
+ .map(|(key, field)| (key.to_owned(), field.to_json_value()))
+ .collect(),
+ )
+ }
}
pub struct RowColumnIter<'a> {
@@ -635,6 +648,55 @@ impl Field {
_ => nyi!(descr, value),
}
}
+
+ #[cfg(feature = "cli")]
+ pub fn to_json_value(&self) -> Value {
+ match &self {
+ Field::Null => Value::Null,
+ Field::Bool(b) => Value::Bool(*b),
+ Field::Byte(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::Short(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::Int(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::Long(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::UByte(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::UShort(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::UInt(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::ULong(n) => Value::Number(serde_json::Number::from(*n)),
+ Field::Float(n) => serde_json::Number::from_f64(f64::from(*n))
+ .map(Value::Number)
+ .unwrap_or(Value::Null),
+ Field::Double(n) => serde_json::Number::from_f64(*n)
+ .map(Value::Number)
+ .unwrap_or(Value::Null),
+ Field::Decimal(n) => Value::String(convert_decimal_to_string(&n)),
+ Field::Str(s) => Value::String(s.to_owned()),
+ Field::Bytes(b) => Value::String(base64::encode(b.data())),
+ Field::Date(d) => Value::String(convert_date_to_string(*d)),
+ Field::TimestampMillis(ts) => {
+ Value::String(convert_timestamp_millis_to_string(*ts))
+ }
+ Field::TimestampMicros(ts) => {
+ Value::String(convert_timestamp_micros_to_string(*ts))
+ }
+ Field::Group(row) => row.to_json_value(),
+ Field::ListInternal(fields) => {
+ Value::Array(fields.elements.iter().map(|f|
f.to_json_value()).collect())
+ }
+ Field::MapInternal(map) => Value::Object(
+ map.entries
+ .iter()
+ .map(|(key_field, value_field)| {
+ let key_val = key_field.to_json_value();
+ let key_str = key_val
+ .as_str()
+ .map(|s| s.to_owned())
+ .unwrap_or_else(|| key_val.to_string());
+ (key_str, value_field.to_json_value())
+ })
+ .collect(),
+ ),
+ }
+ }
}
impl fmt::Display for Field {
@@ -711,7 +773,7 @@ impl fmt::Display for Field {
#[inline]
fn convert_date_to_string(value: u32) -> String {
static NUM_SECONDS_IN_DAY: i64 = 60 * 60 * 24;
- let dt = Local.timestamp(value as i64 * NUM_SECONDS_IN_DAY, 0).date();
+ let dt = Utc.timestamp(value as i64 * NUM_SECONDS_IN_DAY, 0).date();
format!("{}", dt.format("%Y-%m-%d %:z"))
}
@@ -720,7 +782,7 @@ fn convert_date_to_string(value: u32) -> String {
/// Datetime is displayed in local timezone.
#[inline]
fn convert_timestamp_millis_to_string(value: u64) -> String {
- let dt = Local.timestamp((value / 1000) as i64, 0);
+ let dt = Utc.timestamp((value / 1000) as i64, 0);
format!("{}", dt.format("%Y-%m-%d %H:%M:%S %:z"))
}
@@ -984,7 +1046,7 @@ mod tests {
fn test_convert_date_to_string() {
fn check_date_conversion(y: u32, m: u32, d: u32) {
let datetime = chrono::NaiveDate::from_ymd(y as i32, m,
d).and_hms(0, 0, 0);
- let dt = Local.from_utc_datetime(&datetime);
+ let dt = Utc.from_utc_datetime(&datetime);
let res = convert_date_to_string((dt.timestamp() / 60 / 60 / 24)
as u32);
let exp = format!("{}", dt.format("%Y-%m-%d %:z"));
assert_eq!(res, exp);
@@ -1001,7 +1063,7 @@ mod tests {
fn test_convert_timestamp_to_string() {
fn check_datetime_conversion(y: u32, m: u32, d: u32, h: u32, mi: u32,
s: u32) {
let datetime = chrono::NaiveDate::from_ymd(y as i32, m,
d).and_hms(h, mi, s);
- let dt = Local.from_utc_datetime(&datetime);
+ let dt = Utc.from_utc_datetime(&datetime);
let res = convert_timestamp_millis_to_string(dt.timestamp_millis()
as u64);
let exp = format!("{}", dt.format("%Y-%m-%d %H:%M:%S %:z"));
assert_eq!(res, exp);
@@ -1608,6 +1670,117 @@ mod tests {
);
}
}
+
+ #[test]
+ #[cfg(feature = "cli")]
+ fn test_to_json_value() {
+ assert_eq!(Field::Null.to_json_value(), Value::Null);
+ assert_eq!(Field::Bool(true).to_json_value(), Value::Bool(true));
+ assert_eq!(Field::Bool(false).to_json_value(), Value::Bool(false));
+ assert_eq!(
+ Field::Byte(1).to_json_value(),
+ Value::Number(serde_json::Number::from(1))
+ );
+ assert_eq!(
+ Field::Short(2).to_json_value(),
+ Value::Number(serde_json::Number::from(2))
+ );
+ assert_eq!(
+ Field::Int(3).to_json_value(),
+ Value::Number(serde_json::Number::from(3))
+ );
+ assert_eq!(
+ Field::Long(4).to_json_value(),
+ Value::Number(serde_json::Number::from(4))
+ );
+ assert_eq!(
+ Field::UByte(1).to_json_value(),
+ Value::Number(serde_json::Number::from(1))
+ );
+ assert_eq!(
+ Field::UShort(2).to_json_value(),
+ Value::Number(serde_json::Number::from(2))
+ );
+ assert_eq!(
+ Field::UInt(3).to_json_value(),
+ Value::Number(serde_json::Number::from(3))
+ );
+ assert_eq!(
+ Field::ULong(4).to_json_value(),
+ Value::Number(serde_json::Number::from(4))
+ );
+ assert_eq!(
+ Field::Float(5.0).to_json_value(),
+ Value::Number(serde_json::Number::from_f64(f64::from(5.0 as
f32)).unwrap())
+ );
+ assert_eq!(
+ Field::Float(5.1234).to_json_value(),
+ Value::Number(
+ serde_json::Number::from_f64(f64::from(5.1234 as f32)).unwrap()
+ )
+ );
+ assert_eq!(
+ Field::Double(6.0).to_json_value(),
+ Value::Number(serde_json::Number::from_f64(6.0 as f64).unwrap())
+ );
+ assert_eq!(
+ Field::Double(6.1234).to_json_value(),
+ Value::Number(serde_json::Number::from_f64(6.1234 as f64).unwrap())
+ );
+ assert_eq!(
+ Field::Str("abc".to_string()).to_json_value(),
+ Value::String(String::from("abc"))
+ );
+ assert_eq!(
+ Field::Decimal(Decimal::from_i32(4, 8, 2)).to_json_value(),
+ Value::String(String::from("0.04"))
+ );
+ assert_eq!(
+ Field::Bytes(ByteArray::from(vec![1, 2, 3])).to_json_value(),
+ Value::String(String::from("AQID"))
+ );
+ assert_eq!(
+ Field::TimestampMillis(12345678).to_json_value(),
+ Value::String("1970-01-01 03:25:45 +00:00".to_string())
+ );
+ assert_eq!(
+ Field::TimestampMicros(12345678901).to_json_value(),
+ Value::String(convert_timestamp_micros_to_string(12345678901))
+ );
+
+ let fields = vec![
+ ("X".to_string(), Field::Int(1)),
+ ("Y".to_string(), Field::Double(2.2)),
+ ("Z".to_string(), Field::Str("abc".to_string())),
+ ];
+ let row = Field::Group(make_row(fields));
+ assert_eq!(
+ row.to_json_value(),
+ serde_json::json!({"X": 1, "Y": 2.2, "Z": "abc"})
+ );
+
+ let row = Field::ListInternal(make_list(vec![
+ Field::Int(1),
+ Field::Int(12),
+ Field::Null,
+ ]));
+ let array = vec![
+ Value::Number(serde_json::Number::from(1)),
+ Value::Number(serde_json::Number::from(12)),
+ Value::Null,
+ ];
+ assert_eq!(row.to_json_value(), Value::Array(array));
+
+ let row = Field::MapInternal(make_map(vec![
+ (Field::Str("k1".to_string()), Field::Double(1.2)),
+ (Field::Str("k2".to_string()), Field::Double(3.4)),
+ (Field::Str("k3".to_string()), Field::Double(4.5)),
+ ]));
+ assert_eq!(
+ row.to_json_value(),
+ serde_json::json!({"k1": 1.2, "k2": 3.4, "k3": 4.5})
+ );
+ }
}
#[cfg(test)]